You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/19 08:07:56 UTC

spark git commit: [SPARK-19265][SQL] make table relation cache general and does not depend on hive

Repository: spark
Updated Branches:
  refs/heads/master 0c9231858 -> 2e6256002


[SPARK-19265][SQL] make table relation cache general and does not depend on hive

## What changes were proposed in this pull request?

We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc.

However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support.

It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually.

main changes:
1. move the table relation cache to `SessionCatalog`
2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore
3. `FindDataSourceTable` will read/write the table relation cache.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #16621 from cloud-fan/plan-cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e625600
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e625600
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e625600

Branch: refs/heads/master
Commit: 2e62560024999c215cf2373fc9a8070bb2ad5c58
Parents: 0c92318
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Jan 19 00:07:48 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Jan 19 00:07:48 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 33 +++++--
 .../apache/spark/sql/catalyst/identifiers.scala |  4 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../command/AnalyzeColumnCommand.scala          |  3 +-
 .../execution/command/AnalyzeTableCommand.scala |  3 +-
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../datasources/DataSourceStrategy.scala        | 60 ++++++------
 .../apache/spark/sql/internal/CatalogImpl.scala |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 11 ---
 .../spark/sql/execution/command/DDLSuite.scala  |  3 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 98 ++------------------
 .../spark/sql/hive/HiveSessionCatalog.scala     | 37 +-------
 .../spark/sql/hive/HiveSessionState.scala       |  2 +
 .../apache/spark/sql/hive/HiveStrategies.scala  | 17 +++-
 .../CreateHiveTableAsSelectCommand.scala        |  8 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 22 +++++
 .../apache/spark/sql/hive/StatisticsSuite.scala |  8 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 17 ++--
 20 files changed, 144 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8008fcd..e9543f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
 
+import com.google.common.cache.{Cache, CacheBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
@@ -118,6 +118,14 @@ class SessionCatalog(
   }
 
   /**
+   * A cache of qualified table name to table relation plan.
+   */
+  val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
+    // TODO: create a config instead of hardcode 1000 here.
+    CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
+  }
+
+  /**
    * This method is used to make the given path qualified before we
    * store this path in the underlying external catalog. So, when a path
    * does not contain a scheme, this path will not be changed after the default
@@ -573,7 +581,7 @@ class SessionCatalog(
       val relationAlias = alias.getOrElse(table)
       if (db == globalTempViewManager.database) {
         globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
+          SubqueryAlias(relationAlias, viewDef, None)
         }.getOrElse(throw new NoSuchTableException(db, table))
       } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
@@ -586,12 +594,12 @@ class SessionCatalog(
             desc = metadata,
             output = metadata.schema.toAttributes,
             child = parser.parsePlan(viewText))
-          SubqueryAlias(relationAlias, child, Option(name))
+          SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
         } else {
           SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
         }
       } else {
-        SubqueryAlias(relationAlias, tempTables(table), Option(name))
+        SubqueryAlias(relationAlias, tempTables(table), None)
       }
     }
   }
@@ -651,14 +659,21 @@ class SessionCatalog(
    * Refresh the cache entry for a metastore table, if any.
    */
   def refreshTable(name: TableIdentifier): Unit = synchronized {
+    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
+    val tableName = formatTableName(name.table)
+
     // Go through temporary tables and invalidate them.
-    // If the database is defined, this is definitely not a temp table.
+    // If the database is defined, this may be a global temporary view.
     // If the database is not defined, there is a good chance this is a temp table.
     if (name.database.isEmpty) {
-      tempTables.get(formatTableName(name.table)).foreach(_.refresh())
-    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
-      globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
+      tempTables.get(tableName).foreach(_.refresh())
+    } else if (dbName == globalTempViewManager.database) {
+      globalTempViewManager.get(tableName).foreach(_.refresh())
     }
+
+    // Also invalidate the table relation cache.
+    val qualifiedTableName = QualifiedTableName(dbName, tableName)
+    tableRelationCache.invalidate(qualifiedTableName)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 834897b..26697e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String])
   override val identifier: String = table
 
   def this(table: String) = this(table, None)
-
 }
 
+/** A fully qualified identifier for a table (i.e., database.tableName) */
+case class QualifiedTableName(database: String, name: String)
+
 object TableIdentifier {
   def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7a7de25..f935de6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
       == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
     // Otherwise, we'll first look up a temporary table with the same name
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
+      == SubqueryAlias("tbl1", tempTable1, None))
     // Then, if that does not exist, look up the relation in the current database
     sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
@@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
     val tmpView = Range(1, 10, 2, 10)
     catalog.createTempView("vw1", tmpView, overrideIfExists = false)
     val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
-    assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
+    assert(plan == SubqueryAlias("range", tmpView, None))
   }
 
   test("look up view relation") {
@@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
     // Look up a view using current database of the session catalog.
     sessionCatalog.setCurrentDatabase("db3")
     comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
-      SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
+      SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
   }
 
   test("table exists") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7fc03bd..ff1f017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
             relation.catalogTable.identifier
         }
-        EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
+
+        val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
+        EliminateSubqueryAliases(tableRelation) match {
           // check if the table is a data source table (the relation is a BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
             throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 1340c9b..d024a36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     // Compute total size
     val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 4a994e3..30b6cc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     relation match {
       case relation: CatalogRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2468948..1b596c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -450,7 +450,7 @@ case class DescribeTableCommand(
       if (metadata.schema.isEmpty) {
         // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
         // inferred at runtime. We should still support it.
-        describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
+        describeSchema(sparkSession.table(metadata.identifier).schema, result)
       } else {
         describeSchema(metadata.schema, result)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3d3db06..21b07ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.Callable
 
-import org.apache.hadoop.fs.Path
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
 
 /**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
- * source information.
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(
-      sparkSession: SparkSession,
-      simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
-    val table = simpleCatalogRelation.catalogTable
-    val pathOption = table.storage.locationUri.map("path" -> _)
-    val dataSource =
-      DataSource(
-        sparkSession,
-        userSpecifiedSchema = Some(table.schema),
-        partitionColumns = table.partitionColumnNames,
-        bucketSpec = table.bucketSpec,
-        className = table.provider.get,
-        options = table.storage.properties ++ pathOption)
-
-    LogicalRelation(
-      dataSource.resolveRelation(),
-      expectedOutputAttributes = Some(simpleCatalogRelation.output),
-      catalogTable = Some(table))
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+    val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
+    val cache = sparkSession.sessionState.catalog.tableRelationCache
+    val withHiveSupport =
+      sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
+
+    cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+      override def call(): LogicalPlan = {
+        val pathOption = table.storage.locationUri.map("path" -> _)
+        val dataSource =
+          DataSource(
+            sparkSession,
+            // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
+            // inferred at runtime. We should still support it.
+            userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+            partitionColumns = table.partitionColumnNames,
+            bucketSpec = table.bucketSpec,
+            className = table.provider.get,
+            options = table.storage.properties ++ pathOption,
+            // TODO: improve `InMemoryCatalog` and remove this limitation.
+            catalogTable = if (withHiveSupport) Some(table) else None)
+
+        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
+      }
+    })
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
         if DDLUtils.isDatasourceTable(s.metadata) =>
-      i.copy(table = readDataSourceTable(sparkSession, s))
+      i.copy(table = readDataSourceTable(s.metadata))
 
     case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-      readDataSourceTable(sparkSession, s)
+      readDataSourceTable(s.metadata)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 9136a83..3d9f418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
 
     // If this table is cached as an InMemoryRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
+    val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
     val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f4df80f..621a46a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     assert(d.size == d.distinct.size)
   }
 
-  test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
-    val tableName = "tbl"
-    withTable(tableName) {
-      spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
-      val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
-      val expr = relation.resolve("i")
-      val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
-      qe.assertAnalyzed()
-    }
-  }
-
   private def verifyNullabilityInFilterExec(
       df: DataFrame,
       expr: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 97990a6..b4c9e27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("SET LOCATION for managed table") {
-    withTable("src") {
+    withTable("tbl") {
       withTempDir { dir =>
         sql("CREATE TABLE tbl(i INT) USING parquet")
         sql("INSERT INTO tbl SELECT 1")
@@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
 
         sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
+        spark.catalog.refreshTable("tbl")
         // SET LOCATION won't move data from previous table path to new table path.
         assert(spark.table("tbl").count() == 0)
         // the previous table path should be still there.

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e4b1f6a..faa76b7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.sql.hive
 
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -41,9 +39,7 @@ import org.apache.spark.sql.types._
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
   private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
-
-  /** A fully qualified identifier for a table (i.e., database.tableName) */
-  case class QualifiedTableName(database: String, name: String)
+  private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
 
   private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
 
@@ -65,45 +61,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 
-  /** A cache of Spark SQL data source tables that have been accessed. */
-  protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
-    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
-      override def load(in: QualifiedTableName): LogicalPlan = {
-        logDebug(s"Creating new cached data source for $in")
-        val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
-
-        val pathOption = table.storage.locationUri.map("path" -> _)
-        val dataSource =
-          DataSource(
-            sparkSession,
-            // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
-            // inferred at runtime. We should still support it.
-            userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
-            partitionColumns = table.partitionColumnNames,
-            bucketSpec = table.bucketSpec,
-            className = table.provider.get,
-            options = table.storage.properties ++ pathOption,
-            catalogTable = Some(table))
-
-        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
-      }
-    }
-
-    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
-  }
-
-  def refreshTable(tableIdent: TableIdentifier): Unit = {
-    // refreshTable does not eagerly reload the cache. It just invalidate the cache.
-    // Next time when we use the table, it will be populated in the cache.
-    // Since we also cache ParquetRelations converted from Hive Parquet tables and
-    // adding converted ParquetRelations into the cache is not defined in the load function
-    // of the cache (instead, we add the cache entry in convertToParquetRelation),
-    // it is better at here to invalidate the cache to avoid confusing waring logs from the
-    // cache loader (e.g. cannot find data source provider, which is only defined for
-    // data source table.).
-    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
@@ -111,45 +68,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     new Path(new Path(dbLocation), tblName).toString
   }
 
-  /**
-   * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore.
-   *
-   * @param tableIdent The name of the table/view that we look up.
-   * @param alias The alias name of the table/view that we look up.
-   * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore.
-   */
-  def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val qualifiedTableName = getQualifiedTableName(tableIdent)
-    val table = sparkSession.sharedState.externalCatalog.getTable(
-      qualifiedTableName.database, qualifiedTableName.name)
-
-    if (DDLUtils.isDatasourceTable(table)) {
-      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
-      val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
-      // Then, if alias is specified, wrap the table with a Subquery using the alias.
-      // Otherwise, wrap the table with a Subquery using the table name.
-      alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
-    } else if (table.tableType == CatalogTableType.VIEW) {
-      val tableIdentifier = table.identifier
-      val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
-      // The relation is a view, so we wrap the relation by:
-      // 1. Add a [[View]] operator over the relation to keep track of the view desc;
-      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
-      val child = View(
-        desc = table,
-        output = table.schema.toAttributes,
-        child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
-      SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier))
-    } else {
-      val qualifiedTable =
-        MetastoreRelation(
-          qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
-      alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
-    }
-  }
-
   private def getCached(
       tableIdentifier: QualifiedTableName,
       pathsInMetastore: Seq[Path],
@@ -159,7 +77,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       expectedBucketSpec: Option[BucketSpec],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
-    cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+    tableRelationCache.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -178,7 +96,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               Some(logical)
             } else {
               // If the cached relation is not updated, we invalidate it right away.
-              cachedDataSourceTables.invalidate(tableIdentifier)
+              tableRelationCache.invalidate(tableIdentifier)
               None
             }
           case _ =>
@@ -187,7 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 s"should be stored as $expectedFileFormat. However, we are getting " +
                 s"a ${relation.fileFormat} from the metastore cache. This cached " +
                 s"entry will be invalidated.")
-            cachedDataSourceTables.invalidate(tableIdentifier)
+            tableRelationCache.invalidate(tableIdentifier)
             None
         }
       case other =>
@@ -195,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
           s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
             s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
             s"This cached entry will be invalidated.")
-        cachedDataSourceTables.invalidate(tableIdentifier)
+        tableRelationCache.invalidate(tableIdentifier)
         None
     }
   }
@@ -270,7 +188,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
           val created = LogicalRelation(relation,
             catalogTable = Some(metastoreRelation.catalogTable))
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 
@@ -298,7 +216,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 className = fileType).resolveRelation(),
               catalogTable = Some(metastoreRelation.catalogTable))
 
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index b3cbbed..44ef5cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
@@ -58,28 +58,6 @@ private[sql] class HiveSessionCatalog(
     hadoopConf,
     parser) {
 
-  override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
-    synchronized {
-      val table = formatTableName(name.table)
-      val db = formatDatabaseName(name.database.getOrElse(currentDb))
-      if (db == globalTempViewManager.database) {
-        val relationAlias = alias.getOrElse(table)
-        globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
-        }.getOrElse(throw new NoSuchTableException(db, table))
-      } else if (name.database.isDefined || !tempTables.contains(table)) {
-        val newName = name.copy(database = Some(db), table = table)
-        metastoreCatalog.lookupRelation(newName, alias)
-      } else {
-        val relation = tempTables(table)
-        val tableWithQualifiers = SubqueryAlias(table, relation, None)
-        // If an alias was specified by the lookup, wrap the plan in a subquery so that
-        // attributes are properly qualified with this alias.
-        alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
-      }
-    }
-  }
-
   // ----------------------------------------------------------------
   // | Methods and fields for interacting with HiveMetastoreCatalog |
   // ----------------------------------------------------------------
@@ -93,15 +71,6 @@ private[sql] class HiveSessionCatalog(
   val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
 
-  override def refreshTable(name: TableIdentifier): Unit = {
-    super.refreshTable(name)
-    metastoreCatalog.refreshTable(name)
-  }
-
-  def invalidateCache(): Unit = {
-    metastoreCatalog.cachedDataSourceTables.invalidateAll()
-  }
-
   def hiveDefaultTableFilePath(name: TableIdentifier): String = {
     metastoreCatalog.hiveDefaultTableFilePath(name)
   }
@@ -109,7 +78,7 @@ private[sql] class HiveSessionCatalog(
   // For testing only
   private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
     val key = metastoreCatalog.getQualifiedTableName(table)
-    metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+    sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key)
   }
 
   override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 4e30d03..d3cef6e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -67,6 +67,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
         new HiveAnalysis(sparkSession) ::
+        new FindDataSourceTable(sparkSession) ::
+        new FindHiveSerdeTable(sparkSession) ::
         new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7987a0a..b649612 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -127,6 +127,21 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces [[SimpleCatalogRelation]] with [[MetastoreRelation]] if its table provider is hive.
+ */
+class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+        if DDLUtils.isHiveTable(s.metadata) =>
+      i.copy(table =
+        MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session))
+
+    case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
+      MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)
+  }
+}
+
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a0..ccc2d64 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -73,7 +73,9 @@ case class CreateHiveTableAsSelectCommand(
 
       // Get the Metastore Relation
       sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case r: MetastoreRelation => r
+        case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
+          val tableMeta = r.metadata
+          MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
       }
     }
     // TODO ideally, we should get the output data ready first and then

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index dcb8e49..3267c23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -431,7 +431,7 @@ private[hive] class TestHiveSparkSession(
       sharedState.cacheManager.clearCache()
       loadedTables.clear()
       sessionState.catalog.clearTempTables()
-      sessionState.catalog.invalidateCache()
+      sessionState.catalog.tableRelationCache.invalidateAll()
 
       sessionState.metadataHive.reset()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 081f6f6..f0e2c93 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1322,4 +1322,26 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
+
+  test("SPARK-18464: support old table which doesn't store schema in table properties") {
+    withTable("old") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        val tableDesc = CatalogTable(
+          identifier = TableIdentifier("old", Some("default")),
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(
+            properties = Map("path" -> path.getAbsolutePath)
+          ),
+          schema = new StructType(),
+          properties = Map(
+            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+        hiveClient.createTable(tableDesc, ignoreIfExists = false)
+
+        checkAnswer(spark.table("old"), Row(1, "a"))
+
+        checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", "string", null) :: Nil)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 0053aa1..e2fcd2f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -62,7 +62,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
         spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
 
-        val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+        val relation = spark.table("csv_table").queryExecution.analyzed.children.head
           .asInstanceOf[MetastoreRelation]
 
         val properties = relation.hiveQlTable.getParameters
@@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes
+      spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -451,7 +451,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
       // Table lookup will make the table cached.
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
 
@@ -461,7 +461,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       } else {
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 953e291..104b525 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
@@ -513,8 +514,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       isDataSourceTable: Boolean,
       format: String,
       userSpecifiedLocation: Option[String] = None): Unit = {
-    val relation = EliminateSubqueryAliases(
-      sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+    var relation: LogicalPlan = null
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
+      relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed)
+    }
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
@@ -1021,13 +1026,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
     read.json(rdd).createOrReplaceTempView("data")
-    val originalConf = sessionState.conf.convertCTAS
-    setConf(SQLConf.CONVERT_CTAS, false)
 
-    try {
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case metastoreRelation: MetastoreRelation => // OK
+        case SubqueryAlias(_, r: MetastoreRelation, _) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
       }
@@ -1040,8 +1043,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
       sql("DROP TABLE explodeTest")
       dropTempTable("data")
-    } finally {
-      setConf(SQLConf.CONVERT_CTAS, originalConf)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org