You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/25 06:59:45 UTC

[1/3] spark git commit: [SPARK-14014][SQL] Integrate session catalog (attempt #2)

Repository: spark
Updated Branches:
  refs/heads/master 1c70b7650 -> 20ddf5fdd


http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index cc41224..92f424b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
-    sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+    sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
   }
 
   test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), data.map(Row.fromTuple))
     }
-    sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+    sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
   }
 
   test("self-join") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index fe44677..bdd3428 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -68,8 +68,12 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
   }
 
   override def afterAll(): Unit = {
-    orcTableDir.delete()
-    orcTableAsDir.delete()
+    try {
+      orcTableDir.delete()
+      orcTableAsDir.delete()
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("create temporary orc table") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index bb53179..b6fc61d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.command.ExecutedCommand
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
@@ -425,10 +426,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   }
 
   test("Caching converted data source Parquet Relations") {
-    val _catalog = sessionState.catalog
-    def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
+    def checkCached(tableIdentifier: TableIdentifier): Unit = {
       // Converted test_parquet should be cached.
-      sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+      sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
         case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK
         case other =>
@@ -453,17 +453,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
+    var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
 
     // First, make sure the converted test_parquet is not cached.
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
     // Table lookup will make the table cached.
     table("test_insert_parquet")
     checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
     invalidateTable("test_insert_parquet")
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_insert_parquet
@@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       sql("select a, b from jt").collect())
     // Invalidate the cache.
     invalidateTable("test_insert_parquet")
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
 
     // Create a partitioned table.
     sql(
@@ -493,8 +493,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       """.stripMargin)
     // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
     // So, we expect it is not cached.
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
         |PARTITION (`date`='2015-04-02')
         |select a, b from jt
       """.stripMargin)
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
 
     // Make sure we can cache the partitioned table.
     table("test_parquet_partitioned_cache_test")
@@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         """.stripMargin).collect())
 
     invalidateTable("test_parquet_partitioned_cache_test")
-    assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
   }
@@ -700,6 +700,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with
   var partitionedTableDirWithKeyAndComplexTypes: File = null
 
   override def beforeAll(): Unit = {
+    super.beforeAll()
     partitionedTableDir = Utils.createTempDir()
     normalTableDir = Utils.createTempDir()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-14014][SQL] Integrate session catalog (attempt #2)

Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 914f8e9..ca3ce43 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.language.implicitConversions
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
@@ -52,6 +53,7 @@ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
 import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
 import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
 import org.apache.spark.sql.types._
@@ -67,7 +69,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext)
   override def foldable: Boolean = true
   override def nullable: Boolean = false
   override def eval(input: InternalRow): Any = {
-    UTF8String.fromString(ctx.metadataHive.currentDatabase)
+    UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
   }
 }
 
@@ -81,15 +83,31 @@ class HiveContext private[hive](
     sc: SparkContext,
     cacheManager: CacheManager,
     listener: SQLListener,
-    @transient private val execHive: HiveClientImpl,
-    @transient private val metaHive: HiveClient,
-    isRootContext: Boolean)
-  extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
+    @transient private[hive] val executionHive: HiveClientImpl,
+    @transient private[hive] val metadataHive: HiveClient,
+    isRootContext: Boolean,
+    @transient private[sql] val hiveCatalog: HiveCatalog)
+  extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
   self =>
 
+  private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
+    this(
+      sc,
+      new CacheManager,
+      SQLContext.createListenerAndUI(sc),
+      execHive,
+      metaHive,
+      true,
+      new HiveCatalog(metaHive))
+  }
+
   def this(sc: SparkContext) = {
-    this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true)
+    this(
+      sc,
+      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+      HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
   }
+
   def this(sc: JavaSparkContext) = this(sc.sc)
 
   import org.apache.spark.sql.hive.HiveContext._
@@ -106,9 +124,10 @@ class HiveContext private[hive](
       sc = sc,
       cacheManager = cacheManager,
       listener = listener,
-      execHive = executionHive.newSession(),
-      metaHive = metadataHive.newSession(),
-      isRootContext = false)
+      executionHive = executionHive.newSession(),
+      metadataHive = metadataHive.newSession(),
+      isRootContext = false,
+      hiveCatalog = hiveCatalog)
   }
 
   @transient
@@ -149,41 +168,6 @@ class HiveContext private[hive](
    */
   protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS)
 
-  /**
-   * The version of the hive client that will be used to communicate with the metastore.  Note that
-   * this does not necessarily need to be the same version of Hive that is used internally by
-   * Spark SQL for execution.
-   */
-  protected[hive] def hiveMetastoreVersion: String = getConf(HIVE_METASTORE_VERSION)
-
-  /**
-   * The location of the jars that should be used to instantiate the HiveMetastoreClient.  This
-   * property can be one of three options:
-   *  - a classpath in the standard format for both hive and hadoop.
-   *  - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
-   *              option is only valid when using the execution version of Hive.
-   *  - maven - download the correct version of hive on demand from maven.
-   */
-  protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS)
-
-  /**
-   * A comma separated list of class prefixes that should be loaded using the classloader that
-   * is shared between Spark SQL and a specific version of Hive. An example of classes that should
-   * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
-   * to be shared are those that interact with classes that are already shared.  For example,
-   * custom appenders that are used by log4j.
-   */
-  protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
-    getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
-
-  /**
-   * A comma separated list of class prefixes that should explicitly be reloaded for each version
-   * of Hive that Spark SQL is communicating with.  For example, Hive UDFs that are declared in a
-   * prefix that typically would be shared (i.e. org.apache.spark.*)
-   */
-  protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
-    getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
-
   /*
    * hive thrift server use background spark sql thread pool to execute sql queries
    */
@@ -196,29 +180,6 @@ class HiveContext private[hive](
   protected[sql] lazy val substitutor = new VariableSubstitution()
 
   /**
-   * The copy of the hive client that is used for execution.  Currently this must always be
-   * Hive 13 as this is the version of Hive that is packaged with Spark SQL.  This copy of the
-   * client is used for execution related tasks like registering temporary functions or ensuring
-   * that the ThreadLocal SessionState is correctly populated.  This copy of Hive is *not* used
-   * for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
-   */
-  @transient
-  protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
-    execHive
-  } else {
-    logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
-    val loader = new IsolatedClientLoader(
-      version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
-      sparkConf = sc.conf,
-      execJars = Seq(),
-      hadoopConf = sc.hadoopConfiguration,
-      config = newTemporaryConfiguration(useInMemoryDerby = true),
-      isolationOn = false,
-      baseClassLoader = Utils.getContextOrSparkClassLoader)
-    loader.createClient().asInstanceOf[HiveClientImpl]
-  }
-
-  /**
    * Overrides default Hive configurations to avoid breaking changes to Spark SQL users.
    *  - allow SQL11 keywords to be used as identifiers
    */
@@ -228,111 +189,6 @@ class HiveContext private[hive](
 
   defaultOverrides()
 
-  /**
-   * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
-   * The version of the Hive client that is used here must match the metastore that is configured
-   * in the hive-site.xml file.
-   */
-  @transient
-  protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
-    metaHive
-  } else {
-    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
-
-    // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
-    // into the isolated client loader
-    val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
-
-    val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
-    logInfo("default warehouse location is " + defaultWarehouseLocation)
-
-    // `configure` goes second to override other settings.
-    val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure
-
-    val isolatedLoader = if (hiveMetastoreJars == "builtin") {
-      if (hiveExecutionVersion != hiveMetastoreVersion) {
-        throw new IllegalArgumentException(
-          "Builtin jars can only be used when hive execution version == hive metastore version. " +
-          s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
-          "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
-          s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
-      }
-
-      // We recursively find all jars in the class loader chain,
-      // starting from the given classLoader.
-      def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
-        case null => Array.empty[URL]
-        case urlClassLoader: URLClassLoader =>
-          urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
-        case other => allJars(other.getParent)
-      }
-
-      val classLoader = Utils.getContextOrSparkClassLoader
-      val jars = allJars(classLoader)
-      if (jars.length == 0) {
-        throw new IllegalArgumentException(
-          "Unable to locate hive jars to connect to metastore. " +
-            "Please set spark.sql.hive.metastore.jars.")
-      }
-
-      logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
-      new IsolatedClientLoader(
-        version = metaVersion,
-        sparkConf = sc.conf,
-        execJars = jars.toSeq,
-        hadoopConf = sc.hadoopConfiguration,
-        config = allConfig,
-        isolationOn = true,
-        barrierPrefixes = hiveMetastoreBarrierPrefixes,
-        sharedPrefixes = hiveMetastoreSharedPrefixes)
-    } else if (hiveMetastoreJars == "maven") {
-      // TODO: Support for loading the jars from an already downloaded location.
-      logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
-      IsolatedClientLoader.forVersion(
-        hiveMetastoreVersion = hiveMetastoreVersion,
-        hadoopVersion = VersionInfo.getVersion,
-        sparkConf = sc.conf,
-        hadoopConf = sc.hadoopConfiguration,
-        config = allConfig,
-        barrierPrefixes = hiveMetastoreBarrierPrefixes,
-        sharedPrefixes = hiveMetastoreSharedPrefixes)
-    } else {
-      // Convert to files and expand any directories.
-      val jars =
-        hiveMetastoreJars
-          .split(File.pathSeparator)
-          .flatMap {
-            case path if new File(path).getName() == "*" =>
-              val files = new File(path).getParentFile().listFiles()
-              if (files == null) {
-                logWarning(s"Hive jar path '$path' does not exist.")
-                Nil
-              } else {
-                files.filter(_.getName().toLowerCase().endsWith(".jar"))
-              }
-            case path =>
-              new File(path) :: Nil
-          }
-          .map(_.toURI.toURL)
-
-      logInfo(
-        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
-          s"using ${jars.mkString(":")}")
-      new IsolatedClientLoader(
-        version = metaVersion,
-        sparkConf = sc.conf,
-        execJars = jars.toSeq,
-        hadoopConf = sc.hadoopConfiguration,
-        config = allConfig,
-        isolationOn = true,
-        barrierPrefixes = hiveMetastoreBarrierPrefixes,
-        sharedPrefixes = hiveMetastoreSharedPrefixes)
-    }
-    isolatedLoader.createClient()
-  }
-
   protected[sql] override def parseSql(sql: String): LogicalPlan = {
     executionHive.withHiveState {
       super.parseSql(substitutor.substitute(hiveconf, sql))
@@ -432,7 +288,7 @@ class HiveContext private[hive](
         // recorded in the Hive metastore.
         // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
         if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
-          sessionState.catalog.client.alterTable(
+          sessionState.catalog.alterTable(
             relation.table.copy(
               properties = relation.table.properties +
                 (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
@@ -459,64 +315,10 @@ class HiveContext private[hive](
     setConf(entry.key, entry.stringConverter(value))
   }
 
-  /** Overridden by child classes that need to set configuration before the client init. */
-  protected def configure(): Map[String, String] = {
-    // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
-    // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.).  This breaks backwards-
-    // compatibility when users are trying to connecting to a Hive metastore of lower version,
-    // because these options are expected to be integral values in lower versions of Hive.
-    //
-    // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
-    // to their output time units.
-    Seq(
-      ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
-      ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
-      ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
-      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
-      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
-      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
-      ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
-      ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
-      ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
-      ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
-      ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
-      ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
-      ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
-    ).map { case (confVar, unit) =>
-      confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
-    }.toMap
-  }
-
   /**
    * SQLConf and HiveConf contracts:
    *
-   * 1. create a new SessionState for each HiveContext
+   * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext
    * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
    *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
    *    set in the SQLConf *as well as* in the HiveConf.
@@ -600,7 +402,7 @@ class HiveContext private[hive](
 }
 
 
-private[hive] object HiveContext {
+private[hive] object HiveContext extends Logging {
   /** The version of hive used internally by Spark SQL. */
   val hiveExecutionVersion: String = "1.2.1"
 
@@ -666,6 +468,242 @@ private[hive] object HiveContext {
     defaultValue = Some(true),
     doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
 
+  /**
+   * The version of the hive client that will be used to communicate with the metastore.  Note that
+   * this does not necessarily need to be the same version of Hive that is used internally by
+   * Spark SQL for execution.
+   */
+  private def hiveMetastoreVersion(conf: SQLConf): String = {
+    conf.getConf(HIVE_METASTORE_VERSION)
+  }
+
+  /**
+   * The location of the jars that should be used to instantiate the HiveMetastoreClient.  This
+   * property can be one of three options:
+   *  - a classpath in the standard format for both hive and hadoop.
+   *  - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
+   *              option is only valid when using the execution version of Hive.
+   *  - maven - download the correct version of hive on demand from maven.
+   */
+  private def hiveMetastoreJars(conf: SQLConf): String = {
+    conf.getConf(HIVE_METASTORE_JARS)
+  }
+
+  /**
+   * A comma separated list of class prefixes that should be loaded using the classloader that
+   * is shared between Spark SQL and a specific version of Hive. An example of classes that should
+   * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
+   * to be shared are those that interact with classes that are already shared.  For example,
+   * custom appenders that are used by log4j.
+   */
+  private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = {
+    conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
+  }
+
+  /**
+   * A comma separated list of class prefixes that should explicitly be reloaded for each version
+   * of Hive that Spark SQL is communicating with.  For example, Hive UDFs that are declared in a
+   * prefix that typically would be shared (i.e. org.apache.spark.*)
+   */
+  private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = {
+    conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
+  }
+
+  /**
+   * Configurations needed to create a [[HiveClient]].
+   */
+  private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
+    // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
+    // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.).  This breaks backwards-
+    // compatibility when users are trying to connecting to a Hive metastore of lower version,
+    // because these options are expected to be integral values in lower versions of Hive.
+    //
+    // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according
+    // to their output time units.
+    Seq(
+      ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS,
+      ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS,
+      ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS,
+      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS,
+      ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS,
+      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS,
+      ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS,
+      ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS,
+      ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS,
+      ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS,
+      ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS,
+      ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
+      ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
+    ).map { case (confVar, unit) =>
+      confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
+    }.toMap
+  }
+
+  /**
+   * Create a [[HiveClient]] used for execution.
+   *
+   * Currently this must always be Hive 13 as this is the version of Hive that is packaged
+   * with Spark SQL. This copy of the client is used for execution related tasks like
+   * registering temporary functions or ensuring that the ThreadLocal SessionState is
+   * correctly populated.  This copy of Hive is *not* used for storing persistent metadata,
+   * and only point to a dummy metastore in a temporary directory.
+   */
+  protected[hive] def newClientForExecution(
+      conf: SparkConf,
+      hadoopConf: Configuration): HiveClientImpl = {
+    logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
+    val loader = new IsolatedClientLoader(
+      version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
+      sparkConf = conf,
+      execJars = Seq(),
+      hadoopConf = hadoopConf,
+      config = newTemporaryConfiguration(useInMemoryDerby = true),
+      isolationOn = false,
+      baseClassLoader = Utils.getContextOrSparkClassLoader)
+    loader.createClient().asInstanceOf[HiveClientImpl]
+  }
+
+  /**
+   * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
+   *
+   * The version of the Hive client that is used here must match the metastore that is configured
+   * in the hive-site.xml file.
+   */
+  private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
+    val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+    val configurations = hiveClientConfigurations(hiveConf)
+    newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
+  }
+
+  protected[hive] def newClientForMetadata(
+      conf: SparkConf,
+      hiveConf: HiveConf,
+      hadoopConf: Configuration,
+      configurations: Map[String, String]): HiveClient = {
+    val sqlConf = new SQLConf
+    sqlConf.setConf(SQLContext.getSQLProperties(conf))
+    val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf)
+    val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf)
+    val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf)
+    val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf)
+    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
+
+    val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
+    logInfo("default warehouse location is " + defaultWarehouseLocation)
+
+    // `configure` goes second to override other settings.
+    val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
+
+    val isolatedLoader = if (hiveMetastoreJars == "builtin") {
+      if (hiveExecutionVersion != hiveMetastoreVersion) {
+        throw new IllegalArgumentException(
+          "Builtin jars can only be used when hive execution version == hive metastore version. " +
+            s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " +
+            "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
+            s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.")
+      }
+
+      // We recursively find all jars in the class loader chain,
+      // starting from the given classLoader.
+      def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
+        case null => Array.empty[URL]
+        case urlClassLoader: URLClassLoader =>
+          urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
+        case other => allJars(other.getParent)
+      }
+
+      val classLoader = Utils.getContextOrSparkClassLoader
+      val jars = allJars(classLoader)
+      if (jars.length == 0) {
+        throw new IllegalArgumentException(
+          "Unable to locate hive jars to connect to metastore. " +
+            "Please set spark.sql.hive.metastore.jars.")
+      }
+
+      logInfo(
+        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
+      new IsolatedClientLoader(
+        version = metaVersion,
+        sparkConf = conf,
+        hadoopConf = hadoopConf,
+        execJars = jars.toSeq,
+        config = allConfig,
+        isolationOn = true,
+        barrierPrefixes = hiveMetastoreBarrierPrefixes,
+        sharedPrefixes = hiveMetastoreSharedPrefixes)
+    } else if (hiveMetastoreJars == "maven") {
+      // TODO: Support for loading the jars from an already downloaded location.
+      logInfo(
+        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
+      IsolatedClientLoader.forVersion(
+        hiveMetastoreVersion = hiveMetastoreVersion,
+        hadoopVersion = VersionInfo.getVersion,
+        sparkConf = conf,
+        hadoopConf = hadoopConf,
+        config = allConfig,
+        barrierPrefixes = hiveMetastoreBarrierPrefixes,
+        sharedPrefixes = hiveMetastoreSharedPrefixes)
+    } else {
+      // Convert to files and expand any directories.
+      val jars =
+        hiveMetastoreJars
+          .split(File.pathSeparator)
+          .flatMap {
+          case path if new File(path).getName == "*" =>
+            val files = new File(path).getParentFile.listFiles()
+            if (files == null) {
+              logWarning(s"Hive jar path '$path' does not exist.")
+              Nil
+            } else {
+              files.filter(_.getName.toLowerCase.endsWith(".jar"))
+            }
+          case path =>
+            new File(path) :: Nil
+        }
+          .map(_.toURI.toURL)
+
+      logInfo(
+        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
+          s"using ${jars.mkString(":")}")
+      new IsolatedClientLoader(
+        version = metaVersion,
+        sparkConf = conf,
+        hadoopConf = hadoopConf,
+        execJars = jars.toSeq,
+        config = allConfig,
+        isolationOn = true,
+        barrierPrefixes = hiveMetastoreBarrierPrefixes,
+        sharedPrefixes = hiveMetastoreSharedPrefixes)
+    }
+    isolatedLoader.createClient()
+  }
+
   /** Constructs a configuration for hive, where the metastore is located in a temp directory. */
   def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = {
     val withInMemoryMode = if (useInMemoryDerby) "memory:" else ""

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 27e4cfc..c7066d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.DataTypeParser
@@ -98,27 +98,33 @@ private[hive] object HiveSerDe {
 }
 
 
-// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext
+/**
+ * Legacy catalog for interacting with the Hive metastore.
+ *
+ * This is still used for things like creating data source tables, but in the future will be
+ * cleaned up to integrate more nicely with [[HiveCatalog]].
+ */
 private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
-  extends Catalog with Logging {
+  extends Logging {
 
   val conf = hive.conf
 
-  /** Usages should lock on `this`. */
-  protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
-
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String)
 
-  private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
+  private def getCurrentDatabase: String = {
+    hive.sessionState.catalog.getCurrentDatabase
+  }
+
+  def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
     QualifiedTableName(
-      tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+      tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
       tableIdent.table.toLowerCase)
   }
 
   private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
     QualifiedTableName(
-      t.name.database.getOrElse(client.currentDatabase).toLowerCase,
+      t.name.database.getOrElse(getCurrentDatabase).toLowerCase,
       t.name.table.toLowerCase)
   }
 
@@ -194,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
   }
 
-  override def refreshTable(tableIdent: TableIdentifier): Unit = {
+  def refreshTable(tableIdent: TableIdentifier): Unit = {
     // refreshTable does not eagerly reload the cache. It just invalidate the cache.
     // Next time when we use the table, it will be populated in the cache.
     // Since we also cache ParquetRelations converted from Hive Parquet tables and
@@ -408,12 +414,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString
   }
 
-  override def tableExists(tableIdent: TableIdentifier): Boolean = {
-    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-    client.getTableOption(dbName, tblName).isDefined
-  }
-
-  override def lookupRelation(
+  def lookupRelation(
       tableIdent: TableIdentifier,
       alias: Option[String]): LogicalPlan = {
     val qualifiedTableName = getQualifiedTableName(tableIdent)
@@ -555,12 +556,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
-  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    val db = databaseName.getOrElse(client.currentDatabase)
-
-    client.listTables(db).map(tableName => (tableName, false))
-  }
-
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
    * data source relations for better performance.
@@ -716,27 +711,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
     }
   }
 
-  /**
-   * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
-   * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
-   */
-  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  /**
-   * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
-   * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
-   */
-  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def unregisterAllTables(): Unit = {}
-
-  override def setCurrentDatabase(databaseName: String): Unit = {
-    client.setCurrentDatabase(databaseName)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
new file mode 100644
index 0000000..aa44cba
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+
+class HiveSessionCatalog(
+    externalCatalog: HiveCatalog,
+    client: HiveClient,
+    context: HiveContext,
+    conf: SQLConf)
+  extends SessionCatalog(externalCatalog, conf) {
+
+  override def setCurrentDatabase(db: String): Unit = {
+    super.setCurrentDatabase(db)
+    client.setCurrentDatabase(db)
+  }
+
+  override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
+    val table = formatTableName(name.table)
+    if (name.database.isDefined || !tempTables.containsKey(table)) {
+      val newName = name.copy(table = table)
+      metastoreCatalog.lookupRelation(newName, alias)
+    } else {
+      val relation = tempTables.get(table)
+      val tableWithQualifiers = SubqueryAlias(table, relation)
+      // If an alias was specified by the lookup, wrap the plan in a subquery so that
+      // attributes are properly qualified with this alias.
+      alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
+    }
+  }
+
+  // ----------------------------------------------------------------
+  // | Methods and fields for interacting with HiveMetastoreCatalog |
+  // ----------------------------------------------------------------
+
+  // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
+  // essentially a cache for metastore tables. However, it relies on a lot of session-specific
+  // things so it would be a lot of work to split its functionality between HiveSessionCatalog
+  // and HiveCatalog. We should still do it at some point...
+  private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
+
+  val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
+  val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
+  val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
+
+  override def refreshTable(name: TableIdentifier): Unit = {
+    metastoreCatalog.refreshTable(name)
+  }
+
+  def invalidateTable(name: TableIdentifier): Unit = {
+    metastoreCatalog.invalidateTable(name)
+  }
+
+  def invalidateCache(): Unit = {
+    metastoreCatalog.cachedDataSourceTables.invalidateAll()
+  }
+
+  def createDataSourceTable(
+      name: TableIdentifier,
+      userSpecifiedSchema: Option[StructType],
+      partitionColumns: Array[String],
+      bucketSpec: Option[BucketSpec],
+      provider: String,
+      options: Map[String, String],
+      isExternal: Boolean): Unit = {
+    metastoreCatalog.createDataSourceTable(
+      name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
+  }
+
+  def hiveDefaultTableFilePath(name: TableIdentifier): String = {
+    metastoreCatalog.hiveDefaultTableFilePath(name)
+  }
+
+  // For testing only
+  private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
+    val key = metastoreCatalog.getQualifiedTableName(table)
+    metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index d9cd96d..caa7f29 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.execution.{python, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
@@ -35,9 +35,11 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
   }
 
   /**
-   * A metadata catalog that points to the Hive metastore.
+   * Internal catalog for managing table and database states.
    */
-  override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
+  override lazy val catalog = {
+    new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf)
+  }
 
   /**
    * Internal catalog for managing functions registered by the user.
@@ -61,7 +63,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
         DataSourceAnalysis ::
         (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
 
-      override val extendedCheckRules = Seq(PreWriteCheck(catalog))
+      override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index d214e52..f4d3035 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -53,9 +53,6 @@ private[hive] trait HiveClient {
   /** Returns the names of tables in the given database that matches the given pattern. */
   def listTables(dbName: String, pattern: String): Seq[String]
 
-  /** Returns the name of the active database. */
-  def currentDatabase: String
-
   /** Sets the name of current database. */
   def setCurrentDatabase(databaseName: String): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 928408c..e4e15d1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -241,10 +241,6 @@ private[hive] class HiveClientImpl(
     state.err = stream
   }
 
-  override def currentDatabase: String = withHiveState {
-    state.getCurrentDatabase
-  }
-
   override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
     if (getDatabaseOption(databaseName).isDefined) {
       state.setCurrentDatabase(databaseName)

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 391e297..5a61eef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -69,10 +69,10 @@ case class CreateTableAsSelect(
         withFormat
       }
 
-      hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)
+      hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
 
       // Get the Metastore Relation
-      hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
+      hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match {
         case r: MetastoreRelation => r
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 8a1cf2c..9ff520d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -56,7 +56,7 @@ private[hive] case class CreateViewAsSelect(
 
       case true if orReplace =>
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))
+        hiveContext.metadataHive.alertView(prepareTable(sqlContext))
 
       case true =>
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
@@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
           "CREATE OR REPLACE VIEW AS")
 
       case false =>
-        hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
+        hiveContext.metadataHive.createView(prepareTable(sqlContext))
     }
 
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 4ffd868..430fa46 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,7 @@ case class InsertIntoHiveTable(
 
   @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
-  @transient private lazy val catalog = sc.sessionState.catalog
+  @transient private lazy val client = sc.metadataHive
 
   def output: Seq[Attribute] = Seq.empty
 
@@ -186,8 +186,8 @@ case class InsertIntoHiveTable(
       // TODO: Correctly set isSkewedStoreAsSubdir.
       val isSkewedStoreAsSubdir = false
       if (numDynamicPartitions > 0) {
-        catalog.synchronized {
-          catalog.client.loadDynamicPartitions(
+        client.synchronized {
+          client.loadDynamicPartitions(
             outputPath.toString,
             qualifiedTableName,
             orderedPartitionSpec,
@@ -202,12 +202,12 @@ case class InsertIntoHiveTable(
         // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
         // scalastyle:on
         val oldPart =
-          catalog.client.getPartitionOption(
-            catalog.client.getTable(table.databaseName, table.tableName),
+          client.getPartitionOption(
+            client.getTable(table.databaseName, table.tableName),
             partitionSpec)
 
         if (oldPart.isEmpty || !ifNotExists) {
-            catalog.client.loadPartition(
+            client.loadPartition(
               outputPath.toString,
               qualifiedTableName,
               orderedPartitionSpec,
@@ -218,7 +218,7 @@ case class InsertIntoHiveTable(
         }
       }
     } else {
-      catalog.client.loadTable(
+      client.loadTable(
         outputPath.toString, // TODO: URI
         qualifiedTableName,
         overwrite,

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 226b8e1..cd26a68 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,8 @@ case class DropTable(
     }
     hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
-    hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
+    hiveContext.sessionState.catalog.dropTable(
+      TableIdentifier(tableName), ignoreIfNotExists = true)
     Seq.empty[Row]
   }
 }
@@ -142,7 +143,8 @@ case class CreateMetastoreDataSource(
     val optionsWithPath =
       if (!options.contains("path") && managedIfNoPath) {
         isExternal = false
-        options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" ->
+          hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }
@@ -200,7 +202,8 @@ case class CreateMetastoreDataSourceAsSelect(
     val optionsWithPath =
       if (!options.contains("path")) {
         isExternal = false
-        options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" ->
+          hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 19c05f9..a1785ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -24,6 +24,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.implicitConversions
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry
 import org.apache.hadoop.hive.ql.processors._
@@ -35,9 +37,11 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -55,10 +59,6 @@ object TestHive
         // SPARK-8910
         .set("spark.ui.enabled", "false")))
 
-trait TestHiveSingleton {
-  protected val sqlContext: SQLContext = TestHive
-  protected val hiveContext: TestHiveContext = TestHive
-}
 
 /**
  * A locally running test instance of Spark's Hive execution engine.
@@ -71,10 +71,87 @@ trait TestHiveSingleton {
  * hive metastore seems to lead to weird non-deterministic failures.  Therefore, the execution of
  * test cases that rely on TestHive must be serialized.
  */
-class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
-  self =>
+class TestHiveContext private[hive](
+    sc: SparkContext,
+    cacheManager: CacheManager,
+    listener: SQLListener,
+    executionHive: HiveClientImpl,
+    metadataHive: HiveClient,
+    isRootContext: Boolean,
+    hiveCatalog: HiveCatalog,
+    val warehousePath: File,
+    val scratchDirPath: File,
+    metastoreTemporaryConf: Map[String, String])
+  extends HiveContext(
+    sc,
+    cacheManager,
+    listener,
+    executionHive,
+    metadataHive,
+    isRootContext,
+    hiveCatalog) { self =>
+
+  // Unfortunately, due to the complex interactions between the construction parameters
+  // and the limitations in scala constructors, we need many of these constructors to
+  // provide a shorthand to create a new TestHiveContext with only a SparkContext.
+  // This is not a great design pattern but it's necessary here.
+
+  private def this(
+      sc: SparkContext,
+      executionHive: HiveClientImpl,
+      metadataHive: HiveClient,
+      warehousePath: File,
+      scratchDirPath: File,
+      metastoreTemporaryConf: Map[String, String]) {
+    this(
+      sc,
+      new CacheManager,
+      SQLContext.createListenerAndUI(sc),
+      executionHive,
+      metadataHive,
+      true,
+      new HiveCatalog(metadataHive),
+      warehousePath,
+      scratchDirPath,
+      metastoreTemporaryConf)
+  }
+
+  private def this(
+      sc: SparkContext,
+      warehousePath: File,
+      scratchDirPath: File,
+      metastoreTemporaryConf: Map[String, String]) {
+    this(
+      sc,
+      HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
+      TestHiveContext.newClientForMetadata(
+        sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
+      warehousePath,
+      scratchDirPath,
+      metastoreTemporaryConf)
+  }
 
-  import HiveContext._
+  def this(sc: SparkContext) {
+    this(
+      sc,
+      Utils.createTempDir(namePrefix = "warehouse"),
+      TestHiveContext.makeScratchDir(),
+      HiveContext.newTemporaryConfiguration(useInMemoryDerby = false))
+  }
+
+  override def newSession(): HiveContext = {
+    new TestHiveContext(
+      sc = sc,
+      cacheManager = cacheManager,
+      listener = listener,
+      executionHive = executionHive.newSession(),
+      metadataHive = metadataHive.newSession(),
+      isRootContext = false,
+      hiveCatalog = hiveCatalog,
+      warehousePath = warehousePath,
+      scratchDirPath = scratchDirPath,
+      metastoreTemporaryConf = metastoreTemporaryConf)
+  }
 
   // By clearing the port we force Spark to pick a new one.  This allows us to rerun tests
   // without restarting the JVM.
@@ -83,24 +160,12 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
 
   hiveconf.set("hive.plan.serialization.format", "javaXML")
 
-  lazy val warehousePath = Utils.createTempDir(namePrefix = "warehouse-")
-
-  lazy val scratchDirPath = {
-    val dir = Utils.createTempDir(namePrefix = "scratch-")
-    dir.delete()
-    dir
-  }
-
-  private lazy val temporaryConfig = newTemporaryConfiguration(useInMemoryDerby = false)
-
-  /** Sets up the system initially or after a RESET command */
-  protected override def configure(): Map[String, String] = {
-    super.configure() ++ temporaryConfig ++ Map(
-      ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
-      ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
-      ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
-      ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1"
-    )
+  // A snapshot of the entries in the starting SQLConf
+  // We save this because tests can mutate this singleton object if they want
+  val initialSQLConf: SQLConf = {
+    val snapshot = new SQLConf
+    conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
+    snapshot
   }
 
   val testTempDir = Utils.createTempDir()
@@ -427,9 +492,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
 
       cacheManager.clearCache()
       loadedTables.clear()
-      sessionState.catalog.cachedDataSourceTables.invalidateAll()
-      sessionState.catalog.client.reset()
-      sessionState.catalog.unregisterAllTables()
+      sessionState.catalog.clearTempTables()
+      sessionState.catalog.invalidateCache()
+      metadataHive.reset()
 
       FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
         foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
@@ -448,13 +513,13 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       // Lots of tests fail if we do not change the partition whitelist from the default.
       runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
 
-      configure().foreach {
-        case (k, v) =>
-          metadataHive.runSqlHive(s"SET $k=$v")
-      }
+      // In case a test changed any of these values, restore all the original ones here.
+      TestHiveContext.hiveClientConfigurations(
+        hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf)
+          .foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") }
       defaultOverrides()
 
-      runSqlHive("USE default")
+      sessionState.catalog.setCurrentDatabase("default")
     } catch {
       case e: Exception =>
         logError("FATAL ERROR: Failed to reset TestDB state.", e)
@@ -490,4 +555,43 @@ private[hive] object TestHiveContext {
       // Fewer shuffle partitions to speed up testing.
       SQLConf.SHUFFLE_PARTITIONS.key -> "5"
     )
+
+  /**
+   * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
+   */
+  private def newClientForMetadata(
+      conf: SparkConf,
+      hadoopConf: Configuration,
+      warehousePath: File,
+      scratchDirPath: File,
+      metastoreTemporaryConf: Map[String, String]): HiveClient = {
+    val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
+    HiveContext.newClientForMetadata(
+      conf,
+      hiveConf,
+      hadoopConf,
+      hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf))
+  }
+
+  /**
+   * Configurations needed to create a [[HiveClient]].
+   */
+  private def hiveClientConfigurations(
+      hiveconf: HiveConf,
+      warehousePath: File,
+      scratchDirPath: File,
+      metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
+    HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
+      ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
+      ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
+      ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
+      ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1")
+  }
+
+  private def makeScratchDir(): File = {
+    val scratchDir = Utils.createTempDir(namePrefix = "scratch")
+    scratchDir.delete()
+    scratchDir
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index bd14a24..2fc38e2 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -70,8 +70,9 @@ public class JavaMetastoreDataSourcesSuite {
     if (path.exists()) {
       path.delete();
     }
-    hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
-      new TableIdentifier("javaSavedTable")));
+    hiveManagedPath = new Path(
+      sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
+        new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
     if (fs.exists(hiveManagedPath)){
       fs.delete(hiveManagedPath, true);

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
new file mode 100644
index 0000000..154ada3
--- /dev/null
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.test
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SQLContext
+
+
+trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
+  protected val sqlContext: SQLContext = TestHive
+  protected val hiveContext: TestHiveContext = TestHive
+
+  protected override def afterAll(): Unit = {
+    try {
+      hiveContext.reset()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
index 72765f0..4c9c48a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala
@@ -26,6 +26,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   import testImplicits._
 
   protected override def beforeAll(): Unit = {
+    super.beforeAll()
     sql("DROP TABLE IF EXISTS t0")
     sql("DROP TABLE IF EXISTS t1")
     sql("DROP TABLE IF EXISTS t2")
@@ -43,9 +44,13 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   }
 
   override protected def afterAll(): Unit = {
-    sql("DROP TABLE IF EXISTS t0")
-    sql("DROP TABLE IF EXISTS t1")
-    sql("DROP TABLE IF EXISTS t2")
+    try {
+      sql("DROP TABLE IF EXISTS t0")
+      sql("DROP TABLE IF EXISTS t1")
+      sql("DROP TABLE IF EXISTS t2")
+    } finally {
+      super.afterAll()
+    }
   }
 
   private def checkSqlGeneration(hiveQl: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
new file mode 100644
index 0000000..b644a50
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.hive.test.TestHive
+
+
+class HiveContextSuite extends SparkFunSuite {
+
+  test("HiveContext can access `spark.sql.*` configs") {
+    // Avoid creating another SparkContext in the same JVM
+    val sc = TestHive.sparkContext
+    require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
+      "org.apache.spark.sql.hive.execution.PairSerDe")
+    assert(TestHive.initialSQLConf.getConfString("spark.sql.hive.metastore.barrierPrefixes") ==
+      "org.apache.spark.sql.hive.execution.PairSerDe")
+    assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") ==
+      "org.apache.spark.sql.hive.execution.PairSerDe")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
index 35e4339..57f96e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
@@ -33,12 +33,17 @@ class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with
   private var testData: DataFrame = _
 
   override def beforeAll() {
+    super.beforeAll()
     testData = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b")
     hiveContext.registerDataFrameAsTable(testData, "mytable")
   }
 
   override def afterAll(): Unit = {
-    hiveContext.dropTempTable("mytable")
+    try {
+      hiveContext.dropTempTable("mytable")
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("rollup") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index ce7b08a..6967395 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 
-class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
+class HiveMetastoreCatalogSuite extends TestHiveSingleton {
   import hiveContext.implicits._
 
   test("struct field should accept underscore in sub-column name") {
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
             .saveAsTable("t")
         }
 
-        val hiveTable = sessionState.catalog.client.getTable("default", "t")
+        val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
         assert(hiveTable.storage.inputFormat === Some(inputFormat))
         assert(hiveTable.storage.outputFormat === Some(outputFormat))
         assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
               .saveAsTable("t")
           }
 
-          val hiveTable = sessionState.catalog.client.getTable("default", "t")
+          val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
                |AS SELECT 1 AS d1, "val_1" AS d2
              """.stripMargin)
 
-          val hiveTable = sessionState.catalog.client.getTable("default", "t")
+          val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
           assert(hiveTable.storage.inputFormat === Some(inputFormat))
           assert(hiveTable.storage.outputFormat === Some(outputFormat))
           assert(hiveTable.storage.serde === Some(serde))

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 0a31ac6..5272f41 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -31,18 +31,25 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
   val df = sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value")
 
   override def beforeAll(): Unit = {
+    super.beforeAll()
     // The catalog in HiveContext is a case insensitive one.
-    sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
+    sessionState.catalog.createTempTable(
+      "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true)
     sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
     sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
     sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
   }
 
   override def afterAll(): Unit = {
-    sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
-    sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
-    sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
-    sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
+    try {
+      sessionState.catalog.dropTable(
+        TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
+      sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
+      sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
+      sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("get all tables of current database") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index f6b9072..c9bcf81 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -27,6 +27,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   import testImplicits._
 
   protected override def beforeAll(): Unit = {
+    super.beforeAll()
     sql("DROP TABLE IF EXISTS parquet_t0")
     sql("DROP TABLE IF EXISTS parquet_t1")
     sql("DROP TABLE IF EXISTS parquet_t2")
@@ -64,11 +65,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   }
 
   override protected def afterAll(): Unit = {
-    sql("DROP TABLE IF EXISTS parquet_t0")
-    sql("DROP TABLE IF EXISTS parquet_t1")
-    sql("DROP TABLE IF EXISTS parquet_t2")
-    sql("DROP TABLE IF EXISTS parquet_t3")
-    sql("DROP TABLE IF EXISTS t0")
+    try {
+      sql("DROP TABLE IF EXISTS parquet_t0")
+      sql("DROP TABLE IF EXISTS parquet_t1")
+      sql("DROP TABLE IF EXISTS parquet_t2")
+      sql("DROP TABLE IF EXISTS parquet_t3")
+      sql("DROP TABLE IF EXISTS t0")
+    } finally {
+      super.afterAll()
+    }
   }
 
   private def checkHiveQl(hiveQl: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 3f3d069..7165289 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -44,6 +44,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   var jsonFilePath: String = _
 
   override def beforeAll(): Unit = {
+    super.beforeAll()
     jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
   }
 
@@ -693,13 +694,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   test("SPARK-6024 wide schema support") {
     withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
       withTable("wide_schema") {
-        withTempDir( tempDir => {
+        withTempDir { tempDir =>
           // We will need 80 splits for this schema if the threshold is 4000.
           val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
 
           // Manually create a metastore data source table.
           sessionState.catalog.createDataSourceTable(
-            tableIdent = TableIdentifier("wide_schema"),
+            name = TableIdentifier("wide_schema"),
             userSpecifiedSchema = Some(schema),
             partitionColumns = Array.empty[String],
             bucketSpec = None,
@@ -711,7 +712,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
           val actualSchema = table("wide_schema").schema
           assert(schema === actualSchema)
-        })
+        }
       }
     }
   }
@@ -737,7 +738,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           "spark.sql.sources.schema" -> schema.json,
           "EXTERNAL" -> "FALSE"))
 
-      sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false)
+      hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false)
 
       invalidateTable(tableName)
       val actualSchema = table(tableName).schema
@@ -752,7 +753,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     withTable(tableName) {
       df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
       invalidateTable(tableName)
-      val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
+      val metastoreTable = hiveCatalog.getTable("default", tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
 
       val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
@@ -787,7 +788,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         .sortBy("c")
         .saveAsTable(tableName)
       invalidateTable(tableName)
-      val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
+      val metastoreTable = hiveCatalog.getTable("default", tableName)
       val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)
 
@@ -903,11 +904,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
 
   test("skip hive metadata on table creation") {
-    withTempDir(tempPath => {
+    withTempDir { tempPath =>
       val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
 
       sessionState.catalog.createDataSourceTable(
-        tableIdent = TableIdentifier("not_skip_hive_metadata"),
+        name = TableIdentifier("not_skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
         partitionColumns = Array.empty[String],
         bucketSpec = None,
@@ -917,11 +918,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
       // As a proxy for verifying that the table was stored in Hive compatible format,
       // we verify that each column of the table is of native type StringType.
-      assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema
+      assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema
         .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
 
       sessionState.catalog.createDataSourceTable(
-        tableIdent = TableIdentifier("skip_hive_metadata"),
+        name = TableIdentifier("skip_hive_metadata"),
         userSpecifiedSchema = Some(schema),
         partitionColumns = Array.empty[String],
         bucketSpec = None,
@@ -929,10 +930,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"),
         isExternal = false)
 
-      // As a proxy for verifying that the table was stored in SparkSQL format, we verify that
-      // the table has a column type as array of StringType.
-      assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema
-        .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
-    })
+      // As a proxy for verifying that the table was stored in SparkSQL format,
+      // we verify that the table has a column type as array of StringType.
+      assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c =>
+        HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType)
+      })
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index f3af60a..3c00350 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,9 +25,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   private lazy val df = sqlContext.range(10).coalesce(1).toDF()
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
-    val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
-    val expectedPath =
-      hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
+    val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName)
+    val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName
 
     assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 151aacb..ae026ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -121,7 +121,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     intercept[UnsupportedOperationException] {
       hiveContext.analyze("tempTable")
     }
-    hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable"))
+    hiveContext.sessionState.catalog.dropTable(
+      TableIdentifier("tempTable"), ignoreIfNotExists = true)
   }
 
   test("estimates the size of a test MetastoreRelation") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 2950692..d59bca4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -171,10 +171,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
       assert(client.listTables("default") === Seq("src"))
     }
 
-    test(s"$version: currentDatabase") {
-      assert(client.currentDatabase === "default")
-    }
-
     test(s"$version: getDatabase") {
       client.getDatabase("default")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 81fd712..94fbcb7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -189,10 +189,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
   }
 
   override def afterAll(): Unit = {
-    sqlContext.sql("DROP TABLE IF EXISTS agg1")
-    sqlContext.sql("DROP TABLE IF EXISTS agg2")
-    sqlContext.sql("DROP TABLE IF EXISTS agg3")
-    sqlContext.dropTempTable("emptyTable")
+    try {
+      sqlContext.sql("DROP TABLE IF EXISTS agg1")
+      sqlContext.sql("DROP TABLE IF EXISTS agg2")
+      sqlContext.sql("DROP TABLE IF EXISTS agg3")
+      sqlContext.dropTempTable("emptyTable")
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("group by function") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 5fe85ea..197a123 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -49,6 +49,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   import org.apache.spark.sql.hive.test.TestHive.implicits._
 
   override def beforeAll() {
+    super.beforeAll()
     TestHive.cacheTables = true
     // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
     TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
@@ -57,11 +58,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   }
 
   override def afterAll() {
-    TestHive.cacheTables = false
-    TimeZone.setDefault(originalTimeZone)
-    Locale.setDefault(originalLocale)
-    sql("DROP TEMPORARY FUNCTION udtf_count2")
-    super.afterAll()
+    try {
+      TestHive.cacheTables = false
+      TimeZone.setDefault(originalTimeZone)
+      Locale.setDefault(originalLocale)
+      sql("DROP TEMPORARY FUNCTION udtf_count2")
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("SPARK-4908: concurrent hive native commands") {
@@ -1209,7 +1213,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     sql("USE hive_test_db")
     assert("hive_test_db" == sql("select current_database()").first().getString(0))
 
-    intercept[NoSuchDatabaseException] {
+    intercept[AnalysisException] {
       sql("USE not_existing_db")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index d7c529a..b0e263d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -47,7 +47,7 @@ case class ListStringCaseClass(l: Seq[String])
  */
 class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
 
-  import hiveContext.{udf, sql}
+  import hiveContext.udf
   import hiveContext.implicits._
 
   test("spark sql udf test that returns a struct") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index bc8896d..6199253 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1325,6 +1325,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         .format("parquet")
         .save(path)
 
+      // We don't support creating a temporary table while specifying a database
       val message = intercept[AnalysisException] {
         sqlContext.sql(
           s"""
@@ -1335,9 +1336,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
           |)
         """.stripMargin)
       }.getMessage
-      assert(message.contains("Specifying database name or other qualifiers are not allowed"))
 
-      // If you use backticks to quote the name of a temporary table having dot in it.
+      // If you use backticks to quote the name then it's OK.
       sqlContext.sql(
         s"""
           |CREATE TEMPORARY TABLE `db.t`

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
index ea82b8c..c6b7eb6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SQLTestUtils
 class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   override def beforeAll(): Unit = {
+    super.beforeAll()
     sql("DROP TABLE IF EXISTS part")
     sql(
       """
@@ -50,7 +51,11 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
   }
 
   override def afterAll(): Unit = {
-    sql("DROP TABLE IF EXISTS part")
+    try {
+      sql("DROP TABLE IF EXISTS part")
+    } finally {
+      super.afterAll()
+    }
   }
 
   test("windowing.q -- 15. testExpressions") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-14014][SQL] Integrate session catalog (attempt #2)

Posted by an...@apache.org.
[SPARK-14014][SQL] Integrate session catalog (attempt #2)

## What changes were proposed in this pull request?

This reopens #11836, which was merged but promptly reverted because it introduced flaky Hive tests.

## How was this patch tested?

See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`.

Author: Andrew Or <an...@databricks.com>

Closes #11938 from andrewor14/session-catalog-again.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20ddf5fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20ddf5fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20ddf5fd

Branch: refs/heads/master
Commit: 20ddf5fddf40b543edc61d6e4687988489dea64c
Parents: 1c70b76
Author: Andrew Or <an...@databricks.com>
Authored: Thu Mar 24 22:59:35 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Mar 24 22:59:35 2016 -0700

----------------------------------------------------------------------
 R/pkg/inst/tests/testthat/test_sparkSQL.R       |   3 +-
 project/MimaExcludes.scala                      |   3 +
 python/pyspark/sql/context.py                   |   2 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  20 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   | 218 --------
 .../sql/catalyst/analysis/unresolved.scala      |   2 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  35 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   | 123 +++--
 .../spark/sql/catalyst/catalog/interface.scala  |   2 +
 .../sql/catalyst/analysis/AnalysisSuite.scala   |   6 +-
 .../sql/catalyst/analysis/AnalysisTest.scala    |  23 +-
 .../analysis/DecimalPrecisionSuite.scala        |  25 +-
 .../sql/catalyst/catalog/CatalogTestCases.scala |   3 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  20 +-
 .../optimizer/BooleanSimplificationSuite.scala  |  11 +-
 .../optimizer/EliminateSortsSuite.scala         |   5 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  73 ++-
 .../spark/sql/execution/command/commands.scala  |   8 +-
 .../spark/sql/execution/datasources/ddl.scala   |  24 +-
 .../spark/sql/execution/datasources/rules.scala |  10 +-
 .../spark/sql/internal/SessionState.scala       |   7 +-
 .../org/apache/spark/sql/ListTablesSuite.scala  |  15 +-
 .../org/apache/spark/sql/SQLContextSuite.scala  |   9 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  22 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   6 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |   4 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |   3 +-
 .../spark/sql/hive/thriftserver/CliSuite.scala  |   5 +-
 .../hive/execution/HiveCompatibilitySuite.scala |  23 +-
 .../org/apache/spark/sql/hive/HiveCatalog.scala |   5 +-
 .../org/apache/spark/sql/hive/HiveContext.scala | 498 ++++++++++---------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  60 +--
 .../spark/sql/hive/HiveSessionCatalog.scala     | 104 ++++
 .../spark/sql/hive/HiveSessionState.scala       |  10 +-
 .../spark/sql/hive/client/HiveClient.scala      |   3 -
 .../spark/sql/hive/client/HiveClientImpl.scala  |   4 -
 .../hive/execution/CreateTableAsSelect.scala    |   4 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |   4 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  14 +-
 .../spark/sql/hive/execution/commands.scala     |   9 +-
 .../apache/spark/sql/hive/test/TestHive.scala   | 172 +++++--
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |   5 +-
 .../spark/sql/hive/test/TestHiveSingleton.scala |  38 ++
 .../spark/sql/hive/ExpressionToSQLSuite.scala   |  11 +-
 .../spark/sql/hive/HiveContextSuite.scala       |  37 ++
 .../sql/hive/HiveDataFrameAnalyticsSuite.scala  |   7 +-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |  10 +-
 .../apache/spark/sql/hive/ListTablesSuite.scala |  17 +-
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  |  15 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  32 +-
 .../spark/sql/hive/MultiDatabaseSuite.scala     |   5 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   3 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 -
 .../hive/execution/AggregationQuerySuite.scala  |  12 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  16 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala |   2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 .../sql/hive/execution/WindowQuerySuite.scala   |   7 +-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |   4 +-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     |   8 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  25 +-
 61 files changed, 1043 insertions(+), 816 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 63acbad..eef365b 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", {
 
 test_that("SQL error message is returned from JVM", {
   retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
-  expect_equal(grepl("Table not found: blah", retError), TRUE)
+  expect_equal(grepl("Table not found", retError), TRUE)
+  expect_equal(grepl("blah", retError), TRUE)
 })
 
 irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 42eafcb..9158983 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -563,6 +563,9 @@ object MimaExcludes {
         ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
         ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
       ) ++ Seq(
+        // [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
+        ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
+      ) ++ Seq(
         // [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
         (problem: Problem) => problem match {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 9c2f6a3..4008332 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -554,7 +554,7 @@ class SQLContext(object):
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> "table1" in sqlContext.tableNames()
         True
-        >>> "table1" in sqlContext.tableNames("db")
+        >>> "table1" in sqlContext.tableNames("default")
         True
         """
         if dbName is None:

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d0a31e7..b344e04 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.types._
 
 /**
- * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
- * when all relations are already filled in and the analyzer needs only to resolve attribute
- * references.
+ * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
+ * Used for testing when all relations are already filled in and the analyzer needs only
+ * to resolve attribute references.
  */
 object SimpleAnalyzer
-  extends Analyzer(
-    EmptyCatalog,
-    EmptyFunctionRegistry,
-    new SimpleCatalystConf(caseSensitiveAnalysis = true))
+  extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
+class SimpleAnalyzer(conf: CatalystConf)
+  extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)
 
 /**
  * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
- * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
- * a [[FunctionRegistry]].
+ * [[UnresolvedRelation]]s into fully typed objects using information in a
+ * [[SessionCatalog]] and a [[FunctionRegistry]].
  */
 class Analyzer(
-    catalog: Catalog,
+    catalog: SessionCatalog,
     registry: FunctionRegistry,
     conf: CatalystConf,
     maxIterations: Int = 100)

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
deleted file mode 100644
index 2f0a4db..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
-
-
-/**
- * An interface for looking up relations by name.  Used by an [[Analyzer]].
- */
-trait Catalog {
-
-  val conf: CatalystConf
-
-  def tableExists(tableIdent: TableIdentifier): Boolean
-
-  def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
-
-  def setCurrentDatabase(databaseName: String): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  /**
-   * Returns tuples of (tableName, isTemporary) for all tables in the given database.
-   * isTemporary is a Boolean value indicates if a table is a temporary or not.
-   */
-  def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
-
-  def refreshTable(tableIdent: TableIdentifier): Unit
-
-  def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
-
-  def unregisterTable(tableIdent: TableIdentifier): Unit
-
-  def unregisterAllTables(): Unit
-
-  /**
-   * Get the table name of TableIdentifier for temporary tables.
-   */
-  protected def getTableName(tableIdent: TableIdentifier): String = {
-    // It is not allowed to specify database name for temporary tables.
-    // We check it here and throw exception if database is defined.
-    if (tableIdent.database.isDefined) {
-      throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
-        "for temporary tables. If the table name has dots (.) in it, please quote the " +
-        "table name with backticks (`).")
-    }
-    if (conf.caseSensitiveAnalysis) {
-      tableIdent.table
-    } else {
-      tableIdent.table.toLowerCase
-    }
-  }
-}
-
-class SimpleCatalog(val conf: CatalystConf) extends Catalog {
-  private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
-
-  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
-    tables.put(getTableName(tableIdent), plan)
-  }
-
-  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
-    tables.remove(getTableName(tableIdent))
-  }
-
-  override def unregisterAllTables(): Unit = {
-    tables.clear()
-  }
-
-  override def tableExists(tableIdent: TableIdentifier): Boolean = {
-    tables.containsKey(getTableName(tableIdent))
-  }
-
-  override def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String] = None): LogicalPlan = {
-    val tableName = getTableName(tableIdent)
-    val table = tables.get(tableName)
-    if (table == null) {
-      throw new AnalysisException("Table not found: " + tableName)
-    }
-    val qualifiedTable = SubqueryAlias(tableName, table)
-
-    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
-    // properly qualified with this alias.
-    alias
-      .map(a => SubqueryAlias(a, qualifiedTable))
-      .getOrElse(qualifiedTable)
-  }
-
-  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    tables.keySet().asScala.map(_ -> true).toSeq
-  }
-
-  override def refreshTable(tableIdent: TableIdentifier): Unit = {
-    throw new UnsupportedOperationException
-  }
-}
-
-/**
- * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with
- * new logical plans.  This can be used to bind query result to virtual tables, or replace tables
- * with in-memory cached versions.  Note that the set of overrides is stored in memory and thus
- * lost when the JVM exits.
- */
-trait OverrideCatalog extends Catalog {
-  private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
-
-  private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
-    if (tableIdent.database.isDefined) {
-      None
-    } else {
-      Option(overrides.get(getTableName(tableIdent)))
-    }
-  }
-
-  abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
-    getOverriddenTable(tableIdent) match {
-      case Some(_) => true
-      case None => super.tableExists(tableIdent)
-    }
-  }
-
-  abstract override def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String] = None): LogicalPlan = {
-    getOverriddenTable(tableIdent) match {
-      case Some(table) =>
-        val tableName = getTableName(tableIdent)
-        val qualifiedTable = SubqueryAlias(tableName, table)
-
-        // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
-        // are properly qualified with this alias.
-        alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
-
-      case None => super.lookupRelation(tableIdent, alias)
-    }
-  }
-
-  abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
-  }
-
-  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
-    overrides.put(getTableName(tableIdent), plan)
-  }
-
-  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
-    if (tableIdent.database.isEmpty) {
-      overrides.remove(getTableName(tableIdent))
-    }
-  }
-
-  override def unregisterAllTables(): Unit = {
-    overrides.clear()
-  }
-}
-
-/**
- * A trivial catalog that returns an error when a relation is requested.  Used for testing when all
- * relations are already filled in and the analyzer needs only to resolve attribute references.
- */
-object EmptyCatalog extends Catalog {
-
-  override val conf: CatalystConf = EmptyConf
-
-  override def tableExists(tableIdent: TableIdentifier): Boolean = {
-    throw new UnsupportedOperationException
-  }
-
-  override def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String] = None): LogicalPlan = {
-    throw new UnsupportedOperationException
-  }
-
-  override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    throw new UnsupportedOperationException
-  }
-
-  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def unregisterAllTables(): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def refreshTable(tableIdent: TableIdentifier): Unit = {
-    throw new UnsupportedOperationException
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 9518309..e73d367 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
   errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
 
 /**
- * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
+ * Holds the name of a relation that has yet to be looked up in a catalog.
  */
 case class UnresolvedRelation(
     tableIdentifier: TableIdentifier,

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 7ead1dd..e216fa5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
     names.filter { funcName => regex.pattern.matcher(funcName).matches() }
   }
 
-  private def existsFunction(db: String, funcName: String): Boolean = {
+  private def functionExists(db: String, funcName: String): Boolean = {
     requireDbExists(db)
     catalog(db).functions.contains(funcName)
   }
 
-  private def existsTable(db: String, table: String): Boolean = {
-    requireDbExists(db)
-    catalog(db).tables.contains(table)
-  }
-
-  private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
+  private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
     requireTableExists(db, table)
     catalog(db).tables(table).partitions.contains(spec)
   }
 
   private def requireFunctionExists(db: String, funcName: String): Unit = {
-    if (!existsFunction(db, funcName)) {
-      throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
+    if (!functionExists(db, funcName)) {
+      throw new AnalysisException(
+        s"Function not found: '$funcName' does not exist in database '$db'")
     }
   }
 
   private def requireTableExists(db: String, table: String): Unit = {
-    if (!existsTable(db, table)) {
-      throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
+    if (!tableExists(db, table)) {
+      throw new AnalysisException(
+        s"Table not found: '$table' does not exist in database '$db'")
     }
   }
 
   private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
-    if (!existsPartition(db, table, spec)) {
+    if (!partitionExists(db, table, spec)) {
       throw new AnalysisException(
-        s"Partition does not exist in database '$db' table '$table': '$spec'")
+        s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
     }
   }
 
@@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
       ignoreIfExists: Boolean): Unit = synchronized {
     requireDbExists(db)
     val table = tableDefinition.name.table
-    if (existsTable(db, table)) {
+    if (tableExists(db, table)) {
       if (!ignoreIfExists) {
         throw new AnalysisException(s"Table '$table' already exists in database '$db'")
       }
@@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
       table: String,
       ignoreIfNotExists: Boolean): Unit = synchronized {
     requireDbExists(db)
-    if (existsTable(db, table)) {
+    if (tableExists(db, table)) {
       catalog(db).tables.remove(table)
     } else {
       if (!ignoreIfNotExists) {
@@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
     catalog(db).tables(table).table
   }
 
+  override def tableExists(db: String, table: String): Boolean = synchronized {
+    requireDbExists(db)
+    catalog(db).tables.contains(table)
+  }
+
   override def listTables(db: String): Seq[String] = synchronized {
     requireDbExists(db)
     catalog(db).tables.keySet.toSeq
   }
 
   override def listTables(db: String, pattern: String): Seq[String] = synchronized {
-    requireDbExists(db)
     filterPattern(listTables(db), pattern)
   }
 
@@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
     requireDbExists(db)
-    if (existsFunction(db, func.name.funcName)) {
+    if (functionExists(db, func.name.funcName)) {
       throw new AnalysisException(s"Function '$func' already exists in '$db' database")
     } else {
       catalog(db).functions.put(func.name.funcName, func)

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 3ac2bcf..34265fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 
@@ -31,17 +32,34 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
  * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
  * tables and functions of the Spark Session that it belongs to.
  */
-class SessionCatalog(externalCatalog: ExternalCatalog) {
+class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
   import ExternalCatalog._
 
-  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
-  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+  def this(externalCatalog: ExternalCatalog) {
+    this(externalCatalog, new SimpleCatalystConf(true))
+  }
+
+  protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+  protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
 
   // Note: we track current database here because certain operations do not explicitly
   // specify the database (e.g. DROP TABLE my_table). In these cases we must first
   // check whether the temporary table or function exists, then, if not, operate on
   // the corresponding item in the current database.
-  private[this] var currentDb = "default"
+  protected[this] var currentDb = {
+    val defaultName = "default"
+    val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+    // Initialize default database if it doesn't already exist
+    createDatabase(defaultDbDefinition, ignoreIfExists = true)
+    defaultName
+  }
+
+  /**
+   * Format table name, taking into account case sensitivity.
+   */
+  protected[this] def formatTableName(name: String): String = {
+    if (conf.caseSensitiveAnalysis) name else name.toLowerCase
+  }
 
   // ----------------------------------------------------------------------------
   // Databases
@@ -105,8 +123,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
     val db = tableDefinition.name.database.getOrElse(currentDb)
-    val newTableDefinition = tableDefinition.copy(
-      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    val table = formatTableName(tableDefinition.name.table)
+    val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
     externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
   }
 
@@ -121,8 +139,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def alterTable(tableDefinition: CatalogTable): Unit = {
     val db = tableDefinition.name.database.getOrElse(currentDb)
-    val newTableDefinition = tableDefinition.copy(
-      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    val table = formatTableName(tableDefinition.name.table)
+    val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
     externalCatalog.alterTable(db, newTableDefinition)
   }
 
@@ -132,7 +150,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def getTable(name: TableIdentifier): CatalogTable = {
     val db = name.database.getOrElse(currentDb)
-    externalCatalog.getTable(db, name.table)
+    val table = formatTableName(name.table)
+    externalCatalog.getTable(db, table)
   }
 
   // -------------------------------------------------------------
@@ -146,10 +165,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
       name: String,
       tableDefinition: LogicalPlan,
       ignoreIfExists: Boolean): Unit = {
-    if (tempTables.containsKey(name) && !ignoreIfExists) {
+    val table = formatTableName(name)
+    if (tempTables.containsKey(table) && !ignoreIfExists) {
       throw new AnalysisException(s"Temporary table '$name' already exists.")
     }
-    tempTables.put(name, tableDefinition)
+    tempTables.put(table, tableDefinition)
   }
 
   /**
@@ -166,11 +186,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
       throw new AnalysisException("rename does not support moving tables across databases")
     }
     val db = oldName.database.getOrElse(currentDb)
-    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
-      externalCatalog.renameTable(db, oldName.table, newName.table)
+    val oldTableName = formatTableName(oldName.table)
+    val newTableName = formatTableName(newName.table)
+    if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
+      externalCatalog.renameTable(db, oldTableName, newTableName)
     } else {
-      val table = tempTables.remove(oldName.table)
-      tempTables.put(newName.table, table)
+      val table = tempTables.remove(oldTableName)
+      tempTables.put(newTableName, table)
     }
   }
 
@@ -183,10 +205,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
     val db = name.database.getOrElse(currentDb)
-    if (name.database.isDefined || !tempTables.containsKey(name.table)) {
-      externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
+    val table = formatTableName(name.table)
+    if (name.database.isDefined || !tempTables.containsKey(table)) {
+      externalCatalog.dropTable(db, table, ignoreIfNotExists)
     } else {
-      tempTables.remove(name.table)
+      tempTables.remove(table)
     }
   }
 
@@ -199,29 +222,44 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
     val db = name.database.getOrElse(currentDb)
+    val table = formatTableName(name.table)
     val relation =
-      if (name.database.isDefined || !tempTables.containsKey(name.table)) {
-        val metadata = externalCatalog.getTable(db, name.table)
+      if (name.database.isDefined || !tempTables.containsKey(table)) {
+        val metadata = externalCatalog.getTable(db, table)
         CatalogRelation(db, metadata, alias)
       } else {
-        tempTables.get(name.table)
+        tempTables.get(table)
       }
-    val qualifiedTable = SubqueryAlias(name.table, relation)
+    val qualifiedTable = SubqueryAlias(table, relation)
     // If an alias was specified by the lookup, wrap the plan in a subquery so that
     // attributes are properly qualified with this alias.
     alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
   }
 
   /**
-   * List all tables in the specified database, including temporary tables.
+   * Return whether a table with the specified name exists.
+   *
+   * Note: If a database is explicitly specified, then this will return whether the table
+   * exists in that particular database instead. In that case, even if there is a temporary
+   * table with the same name, we will return false if the specified database does not
+   * contain the table.
    */
-  def listTables(db: String): Seq[TableIdentifier] = {
-    val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
-    val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
-    dbTables ++ _tempTables
+  def tableExists(name: TableIdentifier): Boolean = {
+    val db = name.database.getOrElse(currentDb)
+    val table = formatTableName(name.table)
+    if (name.database.isDefined || !tempTables.containsKey(table)) {
+      externalCatalog.tableExists(db, table)
+    } else {
+      true // it's a temporary table
+    }
   }
 
   /**
+   * List all tables in the specified database, including temporary tables.
+   */
+  def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
+
+  /**
    * List all matching tables in the specified database, including temporary tables.
    */
   def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
@@ -235,6 +273,19 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
   }
 
   /**
+   * Refresh the cache entry for a metastore table, if any.
+   */
+  def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
+
+  /**
+   * Drop all existing temporary tables.
+   * For testing only.
+   */
+  def clearTempTables(): Unit = {
+    tempTables.clear()
+  }
+
+  /**
    * Return a temporary table exactly as it was stored.
    * For testing only.
    */
@@ -263,7 +314,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
+    val table = formatTableName(tableName.table)
+    externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
   }
 
   /**
@@ -275,7 +327,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
+    val table = formatTableName(tableName.table)
+    externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
   }
 
   /**
@@ -289,7 +342,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
+    val table = formatTableName(tableName.table)
+    externalCatalog.renamePartitions(db, table, specs, newSpecs)
   }
 
   /**
@@ -303,7 +357,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.alterPartitions(db, tableName.table, parts)
+    val table = formatTableName(tableName.table)
+    externalCatalog.alterPartitions(db, table, parts)
   }
 
   /**
@@ -312,7 +367,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.getPartition(db, tableName.table, spec)
+    val table = formatTableName(tableName.table)
+    externalCatalog.getPartition(db, table, spec)
   }
 
   /**
@@ -321,7 +377,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) {
    */
   def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
     val db = tableName.database.getOrElse(currentDb)
-    externalCatalog.listPartitions(db, tableName.table)
+    val table = formatTableName(tableName.table)
+    externalCatalog.listPartitions(db, table)
   }
 
   // ----------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index c4e4961..3480313 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -91,6 +91,8 @@ abstract class ExternalCatalog {
 
   def getTable(db: String, table: String): CatalogTable
 
+  def tableExists(db: String, table: String): Boolean
+
   def listTables(db: String): Seq[String]
 
   def listTables(db: String, pattern: String): Seq[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 9563f43..346e052 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest {
   }
 
   test("resolve relations") {
-    assertAnalysisError(
-      UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
-
+    assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
     checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
-
     checkAnalysis(
       UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
-
     checkAnalysis(
       UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 39166c4..6fa4bee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -18,26 +18,21 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
 
 trait AnalysisTest extends PlanTest {
 
-  val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = {
-    val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
-    val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false)
+  protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true)
+  protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false)
 
-    val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
-    val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
-
-    caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
-    caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
-
-    new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
-      override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
-    } ->
-    new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
+  private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
+    val conf = new SimpleCatalystConf(caseSensitive)
+    val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+    catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
+    new Analyzer(catalog, EmptyFunctionRegistry, conf) {
       override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 9aa685e..3150186 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -30,11 +31,11 @@ import org.apache.spark.sql.types._
 
 
 class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
-  val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
-  val catalog = new SimpleCatalog(conf)
-  val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+  private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
+  private val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+  private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 
-  val relation = LocalRelation(
+  private val relation = LocalRelation(
     AttributeReference("i", IntegerType)(),
     AttributeReference("d1", DecimalType(2, 1))(),
     AttributeReference("d2", DecimalType(5, 2))(),
@@ -43,15 +44,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
     AttributeReference("b", DoubleType)()
   )
 
-  val i: Expression = UnresolvedAttribute("i")
-  val d1: Expression = UnresolvedAttribute("d1")
-  val d2: Expression = UnresolvedAttribute("d2")
-  val u: Expression = UnresolvedAttribute("u")
-  val f: Expression = UnresolvedAttribute("f")
-  val b: Expression = UnresolvedAttribute("b")
+  private val i: Expression = UnresolvedAttribute("i")
+  private val d1: Expression = UnresolvedAttribute("d1")
+  private val d2: Expression = UnresolvedAttribute("d2")
+  private val u: Expression = UnresolvedAttribute("u")
+  private val f: Expression = UnresolvedAttribute("f")
+  private val b: Expression = UnresolvedAttribute("b")
 
   before {
-    catalog.registerTable(TableIdentifier("table"), relation)
+    catalog.createTempTable("table", relation, ignoreIfExists = true)
   }
 
   private def checkType(expression: Expression, expectedType: DataType): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index a1ea619..277c2d7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -225,13 +225,14 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
 
   test("list tables without pattern") {
     val catalog = newBasicCatalog()
+    intercept[AnalysisException] { catalog.listTables("unknown_db") }
     assert(catalog.listTables("db1").toSet == Set.empty)
     assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
   }
 
   test("list tables with pattern") {
     val catalog = newBasicCatalog()
-    intercept[AnalysisException] { catalog.listTables("unknown_db") }
+    intercept[AnalysisException] { catalog.listTables("unknown_db", "*") }
     assert(catalog.listTables("db1", "*").toSet == Set.empty)
     assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2"))
     assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index e1973ee..74e995c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -397,6 +397,24 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
   }
 
+  test("table exists") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
+    assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
+    assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+    assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
+    assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
+    // If database is explicitly specified, do not check temporary tables
+    val tempTable = Range(1, 10, 1, 10, Seq())
+    catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false)
+    assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+    // If database is not explicitly specified, check the current database
+    catalog.setCurrentDatabase("db2")
+    assert(catalog.tableExists(TableIdentifier("tbl1")))
+    assert(catalog.tableExists(TableIdentifier("tbl2")))
+    assert(catalog.tableExists(TableIdentifier("tbl3")))
+  }
+
   test("list tables without pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempTable = Range(1, 10, 2, 10, Seq())
@@ -429,7 +447,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(catalog.listTables("db2", "*1").toSet ==
       Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
     intercept[AnalysisException] {
-      catalog.listTables("unknown_db")
+      catalog.listTables("unknown_db", "*")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 2ab31ee..e2c76b7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -137,11 +138,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
     checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d))
   }
 
-  private val caseInsensitiveAnalyzer =
-    new Analyzer(
-      EmptyCatalog,
-      EmptyFunctionRegistry,
-      new SimpleCatalystConf(caseSensitiveAnalysis = false))
+  private val caseInsensitiveConf = new SimpleCatalystConf(false)
+  private val caseInsensitiveAnalyzer = new Analyzer(
+    new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf),
+    EmptyFunctionRegistry,
+    caseInsensitiveConf)
 
   test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
     val plan = caseInsensitiveAnalyzer.execute(

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index a4c8d1c..3824c67 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.SimpleCatalystConf
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
 
 class EliminateSortsSuite extends PlanTest {
   val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
-  val catalog = new SimpleCatalog(conf)
+  val catalog = new SessionCatalog(new InMemoryCatalog, conf)
   val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 
   object Optimize extends RuleExecutor[LogicalPlan] {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 853a74c..e413e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,13 +25,14 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
@@ -65,13 +66,14 @@ class SQLContext private[sql](
     @transient val sparkContext: SparkContext,
     @transient protected[sql] val cacheManager: CacheManager,
     @transient private[sql] val listener: SQLListener,
-    val isRootContext: Boolean)
+    val isRootContext: Boolean,
+    @transient private[sql] val externalCatalog: ExternalCatalog)
   extends Logging with Serializable {
 
   self =>
 
-  def this(sparkContext: SparkContext) = {
-    this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
+  def this(sc: SparkContext) = {
+    this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog)
   }
 
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
@@ -109,7 +111,8 @@ class SQLContext private[sql](
       sparkContext = sparkContext,
       cacheManager = cacheManager,
       listener = listener,
-      isRootContext = false)
+      isRootContext = false,
+      externalCatalog = externalCatalog)
   }
 
   /**
@@ -186,6 +189,12 @@ class SQLContext private[sql](
    */
   def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
 
+  // Extract `spark.sql.*` entries and put it in our SQLConf.
+  // Subclasses may additionally set these entries in other confs.
+  SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) =>
+    setConf(k, v)
+  }
+
   protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql)
 
   protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
@@ -199,30 +208,6 @@ class SQLContext private[sql](
     sparkContext.addJar(path)
   }
 
-  {
-    // We extract spark sql settings from SparkContext's conf and put them to
-    // Spark SQL's conf.
-    // First, we populate the SQLConf (conf). So, we can make sure that other values using
-    // those settings in their construction can get the correct settings.
-    // For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version
-    // and spark.sql.hive.metastore.jars to get correctly constructed.
-    val properties = new Properties
-    sparkContext.getConf.getAll.foreach {
-      case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value)
-      case _ =>
-    }
-    // We directly put those settings to conf to avoid of calling setConf, which may have
-    // side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive
-    // get constructed. If we call setConf directly, the constructed metadataHive may have
-    // wrong settings, or the construction may fail.
-    conf.setConf(properties)
-    // After we have populated SQLConf, we call setConf to populate other confs in the subclass
-    // (e.g. hiveconf in HiveContext).
-    properties.asScala.foreach {
-      case (key, value) => setConf(key, value)
-    }
-  }
-
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used to hook into
@@ -683,8 +668,10 @@ class SQLContext private[sql](
    * only during the lifetime of this instance of SQLContext.
    */
   private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
-    sessionState.catalog.registerTable(
-      sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
+    sessionState.catalog.createTempTable(
+      sessionState.sqlParser.parseTableIdentifier(tableName).table,
+      df.logicalPlan,
+      ignoreIfExists = true)
   }
 
   /**
@@ -697,7 +684,7 @@ class SQLContext private[sql](
    */
   def dropTempTable(tableName: String): Unit = {
     cacheManager.tryUncacheQuery(table(tableName))
-    sessionState.catalog.unregisterTable(TableIdentifier(tableName))
+    sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true)
   }
 
   /**
@@ -824,9 +811,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(): Array[String] = {
-    sessionState.catalog.getTables(None).map {
-      case (tableName, _) => tableName
-    }.toArray
+    tableNames(sessionState.catalog.getCurrentDatabase)
   }
 
   /**
@@ -836,9 +821,7 @@ class SQLContext private[sql](
    * @since 1.3.0
    */
   def tableNames(databaseName: String): Array[String] = {
-    sessionState.catalog.getTables(Some(databaseName)).map {
-      case (tableName, _) => tableName
-    }.toArray
+    sessionState.catalog.listTables(databaseName).map(_.table).toArray
   }
 
   @transient
@@ -1025,4 +1008,18 @@ object SQLContext {
     }
     sqlListener.get()
   }
+
+  /**
+   * Extract `spark.sql.*` properties from the conf and return them as a [[Properties]].
+   */
+  private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = {
+    val properties = new Properties
+    sparkConf.getAll.foreach { case (key, value) =>
+      if (key.startsWith("spark.sql")) {
+        properties.setProperty(key, value)
+      }
+    }
+    properties
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 59c3ffc..964f0a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -339,10 +339,12 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
   override def run(sqlContext: SQLContext): Seq[Row] = {
     // Since we need to return a Seq of rows, we will call getTables directly
     // instead of calling tables in sqlContext.
-    val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
-      case (tableName, isTemporary) => Row(tableName, isTemporary)
+    val catalog = sqlContext.sessionState.catalog
+    val db = databaseName.getOrElse(catalog.getCurrentDatabase)
+    val rows = catalog.listTables(db).map { t =>
+      val isTemp = t.database.isEmpty
+      Row(t.table, isTemp)
     }
-
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 9e8e035..24923bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -93,15 +93,21 @@ case class CreateTempTableUsing(
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
+  if (tableIdent.database.isDefined) {
+    throw new AnalysisException(
+      s"Temporary table '$tableIdent' should not have specified a database")
+  }
+
   def run(sqlContext: SQLContext): Seq[Row] = {
     val dataSource = DataSource(
       sqlContext,
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       options = options)
-    sqlContext.sessionState.catalog.registerTable(
-      tableIdent,
-      Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
+    sqlContext.sessionState.catalog.createTempTable(
+      tableIdent.table,
+      Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan,
+      ignoreIfExists = true)
 
     Seq.empty[Row]
   }
@@ -115,6 +121,11 @@ case class CreateTempTableUsingAsSelect(
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
 
+  if (tableIdent.database.isDefined) {
+    throw new AnalysisException(
+      s"Temporary table '$tableIdent' should not have specified a database")
+  }
+
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val df = Dataset.ofRows(sqlContext, query)
     val dataSource = DataSource(
@@ -124,9 +135,10 @@ case class CreateTempTableUsingAsSelect(
       bucketSpec = None,
       options = options)
     val result = dataSource.write(mode, df)
-    sqlContext.sessionState.catalog.registerTable(
-      tableIdent,
-      Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan)
+    sqlContext.sessionState.catalog.createTempTable(
+      tableIdent.table,
+      Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan,
+      ignoreIfExists = true)
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 63f0e4f..28ac458 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
 
 /**
@@ -99,7 +101,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
 /**
  * A rule to do various checks before inserting into or writing to a data source table.
  */
-private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
+private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
+  extends (LogicalPlan => Unit) {
+
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
   def apply(plan: LogicalPlan): Unit = {
@@ -139,7 +143,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
         PartitioningUtils.validatePartitionColumnDataTypes(
-          r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis)
+          r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
 
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
@@ -190,7 +194,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
         PartitioningUtils.validatePartitionColumnDataTypes(
-          c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis)
+          c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
 
         for {
           spec <- c.bucketSpec

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index e6be0ab..e5f02ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.internal
 
 import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -45,7 +46,7 @@ private[sql] class SessionState(ctx: SQLContext) {
   /**
    * Internal catalog for managing table and database states.
    */
-  lazy val catalog: Catalog = new SimpleCatalog(conf)
+  lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf)
 
   /**
    * Internal catalog for managing functions registered by the user.
@@ -68,7 +69,7 @@ private[sql] class SessionState(ctx: SQLContext) {
         DataSourceAnalysis ::
         (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
 
-      override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
+      override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index 2820e4f..bb54c52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -33,7 +33,8 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
   }
 
   after {
-    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
   }
 
   test("get all tables") {
@@ -45,20 +46,22 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
-  test("getting all Tables with a database name has no impact on returned table names") {
+  test("getting all tables with a database name has no impact on returned table names") {
     checkAnswer(
-      sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"),
+      sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
     checkAnswer(
-      sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
+      sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2ad92b5..2f62ad4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
 
-class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
+class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
 
   object DummyRule extends Rule[LogicalPlan] {
     def apply(p: LogicalPlan): LogicalPlan = p
@@ -78,4 +78,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
     sqlContext.experimental.extraOptimizations = Seq(DummyRule)
     assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
   }
+
+  test("SQLContext can access `spark.sql.*` configs") {
+    sc.conf.set("spark.sql.with.or.without.you", "my love")
+    val sqlContext = new SQLContext(sc)
+    assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 077e579..c958eac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1476,12 +1476,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-4699 case sensitivity SQL query") {
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
-    val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
-    val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
-    rdd.toDF().registerTempTable("testTable1")
-    checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
+    val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE)
+    try {
+      sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
+      val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
+      val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
+      rdd.toDF().registerTempTable("testTable1")
+      checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
+    } finally {
+      sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig)
+    }
   }
 
   test("SPARK-6145: ORDER BY test for nested fields") {
@@ -1755,7 +1759,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         .format("parquet")
         .save(path)
 
-      val message = intercept[AnalysisException] {
+      // We don't support creating a temporary table while specifying a database
+      intercept[AnalysisException] {
         sqlContext.sql(
           s"""
           |CREATE TEMPORARY TABLE db.t
@@ -1765,9 +1770,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
           |)
         """.stripMargin)
       }.getMessage
-      assert(message.contains("Specifying database name or other qualifiers are not allowed"))
 
-      // If you use backticks to quote the name of a temporary table having dot in it.
+      // If you use backticks to quote the name then it's OK.
       sqlContext.sql(
         s"""
           |CREATE TEMPORARY TABLE `db.t`

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index f8166c7..2f806eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -51,7 +51,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
     }
-    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("tmp"), ignoreIfNotExists = true)
   }
 
   test("overwriting") {
@@ -61,7 +62,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
     }
-    sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
+    sqlContext.sessionState.catalog.dropTable(
+      TableIdentifier("tmp"), ignoreIfNotExists = true)
   }
 
   test("self-join") {

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index d483585..80a85a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils
    * `f` returns.
    */
   protected def activateDatabase(db: String)(f: => Unit): Unit = {
-    sqlContext.sql(s"USE $db")
-    try f finally sqlContext.sql(s"USE default")
+    sqlContext.sessionState.catalog.setCurrentDatabase(db)
+    try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 7fe31b0..5769328 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -150,7 +150,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     }
 
     if (sessionState.database != null) {
-      SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
+      SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase(
+        s"${sessionState.database}")
     }
 
     // Execute -i init files (always in silent mode)

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 032965d..8e1ebe2 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -193,10 +193,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
     )
 
     runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
-      ""
-        -> "OK",
-      ""
-        -> "hive_test"
+      "" -> "hive_test"
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 05f59f1..650797f 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -60,16 +60,19 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   }
 
   override def afterAll() {
-    TestHive.cacheTables = false
-    TimeZone.setDefault(originalTimeZone)
-    Locale.setDefault(originalLocale)
-    TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
-    TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
-    TestHive.sessionState.functionRegistry.restore()
-
-    // For debugging dump some statistics about how much time was spent in various optimizer rules.
-    logWarning(RuleExecutor.dumpTimeSpent())
-    super.afterAll()
+    try {
+      TestHive.cacheTables = false
+      TimeZone.setDefault(originalTimeZone)
+      Locale.setDefault(originalLocale)
+      TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+      TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
+      TestHive.sessionState.functionRegistry.restore()
+
+      // For debugging dump some statistics about how much time was spent in various optimizer rules.
+      logWarning(RuleExecutor.dumpTimeSpent())
+    } finally {
+      super.afterAll()
+    }
   }
 
   /** A list of tests deemed out of scope currently and thus completely disregarded. */

http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
index 491f2ae..0722fb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -85,7 +85,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
     withClient { getTable(db, table) }
   }
 
-
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -182,6 +181,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
     client.getTable(db, table)
   }
 
+  override def tableExists(db: String, table: String): Boolean = withClient {
+    client.getTableOption(db, table).isDefined
+  }
+
   override def listTables(db: String): Seq[String] = withClient {
     requireDbExists(db)
     client.listTables(db)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org