You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/28 17:24:42 UTC

[2/2] spark git commit: [SPARK-19678][SQL] remove MetastoreRelation

[SPARK-19678][SQL] remove MetastoreRelation

## What changes were proposed in this pull request?

`MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation`

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #17015 from cloud-fan/table-relation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c7fc30b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c7fc30b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c7fc30b

Branch: refs/heads/master
Commit: 7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd
Parents: b405466
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Feb 28 09:24:36 2017 -0800
Committer: Xiao Li <ga...@gmail.com>
Committed: Tue Feb 28 09:24:36 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   |   2 -
 .../sql/catalyst/catalog/SessionCatalog.scala   |   7 +-
 .../spark/sql/catalyst/catalog/interface.scala  |  68 ++++---
 .../apache/spark/sql/catalyst/identifiers.scala |   4 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |   8 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   8 +-
 .../execution/OptimizeMetadataOnlyQuery.scala   |   8 +-
 .../command/AnalyzeColumnCommand.scala          |  49 ++---
 .../execution/command/AnalyzeTableCommand.scala |  78 ++++----
 .../datasources/DataSourceStrategy.scala        |  29 +--
 .../spark/sql/execution/datasources/rules.scala |   2 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 130 ++++++-------
 .../spark/sql/hive/HiveSessionState.scala       |   2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  82 ++++++---
 .../spark/sql/hive/MetastoreRelation.scala      | 179 ------------------
 .../org/apache/spark/sql/hive/TableReader.scala |  27 +--
 .../sql/hive/execution/HiveTableScanExec.scala  |  87 +++++----
 .../hive/execution/InsertIntoHiveTable.scala    |  47 +++--
 .../spark/sql/hive/MetastoreRelationSuite.scala |  55 ------
 .../apache/spark/sql/hive/StatisticsSuite.scala | 183 +++++++------------
 .../sql/hive/execution/HiveComparisonTest.scala |   4 +-
 .../sql/hive/execution/HiveTableScanSuite.scala |  12 +-
 .../spark/sql/hive/execution/PruningSuite.scala |   4 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  28 ++-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |   5 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   4 +-
 27 files changed, 442 insertions(+), 672 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 36ab8b8..7529f90 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans.UsingJoin
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0230626..0673489 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -592,7 +592,12 @@ class SessionCatalog(
             child = parser.parsePlan(viewText))
           SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db))))
         } else {
-          SubqueryAlias(table, SimpleCatalogRelation(metadata), None)
+          val tableRelation = CatalogRelation(
+            metadata,
+            // we assume all the columns are nullable.
+            metadata.dataSchema.asNullable.toAttributes,
+            metadata.partitionSchema.asNullable.toAttributes)
+          SubqueryAlias(table, tableRelation, None)
         }
       } else {
         SubqueryAlias(table, tempTables(table), None)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2b3b575..cb93902 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.catalog
 
 import java.util.Date
 
-import scala.collection.mutable
+import com.google.common.base.Objects
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -349,36 +350,43 @@ object CatalogTypes {
 
 
 /**
- * An interface that is implemented by logical plans to return the underlying catalog table.
- * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
- * probably remove this interface.
+ * A [[LogicalPlan]] that represents a table.
  */
-trait CatalogRelation {
-  def catalogTable: CatalogTable
-  def output: Seq[Attribute]
-}
+case class CatalogRelation(
+    tableMeta: CatalogTable,
+    dataCols: Seq[Attribute],
+    partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+  assert(tableMeta.identifier.database.isDefined)
+  assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
+  assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
+
+  // The partition column should always appear after data columns.
+  override def output: Seq[Attribute] = dataCols ++ partitionCols
+
+  def isPartitioned: Boolean = partitionCols.nonEmpty
+
+  override def equals(relation: Any): Boolean = relation match {
+    case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+    case _ => false
+  }
 
+  override def hashCode(): Int = {
+    Objects.hashCode(tableMeta.identifier, output)
+  }
 
-/**
- * A [[LogicalPlan]] that wraps [[CatalogTable]].
- *
- * Note that in the future we should consolidate this and HiveCatalogRelation.
- */
-case class SimpleCatalogRelation(
-    metadata: CatalogTable)
-  extends LeafNode with CatalogRelation {
-
-  override def catalogTable: CatalogTable = metadata
-
-  override lazy val resolved: Boolean = false
-
-  override val output: Seq[Attribute] = {
-    val (partCols, dataCols) = metadata.schema.toAttributes
-      // Since data can be dumped in randomly with no validation, everything is nullable.
-      .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
-      .partition { a =>
-        metadata.partitionColumnNames.contains(a.name)
-      }
-    dataCols ++ partCols
+  /** Only compare table identifier. */
+  override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)
+
+  override def computeStats(conf: CatalystConf): Statistics = {
+    // For data source tables, we will create a `LogicalRelation` and won't call this method, for
+    // hive serde tables, we will always generate a statistics.
+    // TODO: unify the table stats generation.
+    tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
+      throw new IllegalStateException("table stats must be specified.")
+    }
   }
+
+  override def newInstance(): LogicalPlan = copy(
+    dataCols = dataCols.map(_.newInstance()),
+    partitionCols = partitionCols.map(_.newInstance()))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 26697e9..a3cc452 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -63,7 +63,9 @@ case class TableIdentifier(table: String, database: Option[String])
 }
 
 /** A fully qualified identifier for a table (i.e., database.tableName) */
-case class QualifiedTableName(database: String, name: String)
+case class QualifiedTableName(database: String, name: String) {
+  override def toString: String = s"$database.$name"
+}
 
 object TableIdentifier {
   def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 4443432..a755231 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -433,15 +433,15 @@ class SessionCatalogSuite extends PlanTest {
     sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
     // If we explicitly specify the database, we'll look up the relation in that database
-    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
-      == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
+      .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
     // Otherwise, we'll first look up a temporary table with the same name
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
       == SubqueryAlias("tbl1", tempTable1, None))
     // Then, if that does not exist, look up the relation in the current database
     sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
-    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
+    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head
+      .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
   }
 
   test("look up view relation") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3939251..49e85dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
-            relation.catalogTable.identifier
+          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
+            relation.tableMeta.identifier
         }
 
         val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           // check hive table relation when overwrite mode
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
-            && srcRelations.contains(relation.catalogTable.identifier) =>
+          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
+            && srcRelations.contains(relation.tableMeta.identifier) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           case _ => // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index b8ac070..b02edd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery(
             LocalRelation(partAttrs, partitionData.map(_.values))
 
           case relation: CatalogRelation =>
-            val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
-            val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
+            val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+            val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
               InternalRow.fromSeq(partAttrs.map { attr =>
                 // TODO: use correct timezone for partition values.
                 Cast(Literal(p.spec(attr.name)), attr.dataType,
@@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery(
         val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
         Some(AttributeSet(partAttrs), l)
 
-      case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
-        val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
+      case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+        val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
         Some(AttributeSet(partAttrs), relation)
 
       case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d024a36..b89014e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 
 /**
@@ -40,60 +37,40 @@ case class AnalyzeColumnCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation =
-      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
-    // Compute total size
-    val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
-      case catalogRel: CatalogRelation =>
-        // This is a Hive serde format table
-        (catalogRel.catalogTable,
-          AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
-
-      case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        // This is a data source format table
-        (logicalRel.catalogTable.get,
-          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
-      case otherRelation =>
-        throw new AnalysisException("ANALYZE TABLE is not supported for " +
-          s"${otherRelation.nodeName}.")
+    val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+    if (tableMeta.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
+    val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
 
     // Compute stats for each column
-    val (rowCount, newColStats) =
-      AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
+    val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
 
     // We also update table-level stats in order to keep them consistent with column-level stats.
     val statistics = CatalogStatistics(
       sizeInBytes = sizeInBytes,
       rowCount = Some(rowCount),
       // Newly computed column stats should override the existing ones.
-      colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
+      colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
 
-    sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
+    sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics)))
 
     // Refresh the cached data source table in the catalog.
     sessionState.catalog.refreshTable(tableIdentWithDB)
 
     Seq.empty[Row]
   }
-}
-
-object AnalyzeColumnCommand extends Logging {
 
   /**
    * Compute stats for the given columns.
    * @return (row count, map from column name to ColumnStats)
-   *
-   * This is visible for testing.
    */
-  def computeColumnStats(
+  private def computeColumnStats(
       sparkSession: SparkSession,
-      tableName: String,
-      relation: LogicalPlan,
+      tableIdent: TableIdentifier,
       columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
 
+    val relation = sparkSession.table(tableIdent).logicalPlan
     // Resolve the column names and dedup using AttributeSet
     val resolver = sparkSession.sessionState.conf.resolver
     val attributesToAnalyze = AttributeSet(columnNames.map { col =>
@@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging {
     attributesToAnalyze.foreach { attr =>
       if (!ColumnStat.supportsType(attr.dataType)) {
         throw new AnalysisException(
-          s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " +
+          s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
             "and Spark does not support statistics collection on this column type.")
       }
     }
@@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging {
     // The layout of each struct follows the layout of the ColumnStats.
     val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
     val expressions = Count(Literal(1)).toAggregateExpression() +:
-        attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
+      attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
 
     val namedExpressions = expressions.map(e => Alias(e, e.toString)())
     val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 30b6cc7..d2ea0cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -22,11 +22,9 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.internal.SessionState
 
 
@@ -41,53 +39,39 @@ case class AnalyzeTableCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation =
-      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
-
-    relation match {
-      case relation: CatalogRelation =>
-        updateTableStats(relation.catalogTable,
-          AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable))
-
-      // data source tables have been converted into LogicalRelations
-      case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        updateTableStats(logicalRel.catalogTable.get,
-          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
-
-      case otherRelation =>
-        throw new AnalysisException("ANALYZE TABLE is not supported for " +
-          s"${otherRelation.nodeName}.")
+    val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+    if (tableMeta.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
+    val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
 
-    def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
-      val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-      val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-      var newStats: Option[CatalogStatistics] = None
-      if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
-        newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
-      }
-      // We only set rowCount when noscan is false, because otherwise:
-      // 1. when total size is not changed, we don't need to alter the table;
-      // 2. when total size is changed, `oldRowCount` becomes invalid.
-      // This is to make sure that we only record the right statistics.
-      if (!noscan) {
-        val newRowCount = Dataset.ofRows(sparkSession, relation).count()
-        if (newRowCount >= 0 && newRowCount != oldRowCount) {
-          newStats = if (newStats.isDefined) {
-            newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
-          } else {
-            Some(CatalogStatistics(
-              sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
-          }
+    val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+    val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+    var newStats: Option[CatalogStatistics] = None
+    if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+      newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
+    }
+    // We only set rowCount when noscan is false, because otherwise:
+    // 1. when total size is not changed, we don't need to alter the table;
+    // 2. when total size is changed, `oldRowCount` becomes invalid.
+    // This is to make sure that we only record the right statistics.
+    if (!noscan) {
+      val newRowCount = sparkSession.table(tableIdentWithDB).count()
+      if (newRowCount >= 0 && newRowCount != oldRowCount) {
+        newStats = if (newStats.isDefined) {
+          newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
+        } else {
+          Some(CatalogStatistics(
+            sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
         }
       }
-      // Update the metastore if the above statistics of the table are different from those
-      // recorded in the metastore.
-      if (newStats.isDefined) {
-        sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
-        // Refresh the cached data source table in the catalog.
-        sessionState.catalog.refreshTable(tableIdentWithDB)
-      }
+    }
+    // Update the metastore if the above statistics of the table are different from those
+    // recorded in the metastore.
+    if (newStats.isDefined) {
+      sessionState.catalog.alterTable(tableMeta.copy(stats = newStats))
+      // Refresh the cached data source table in the catalog.
+      sessionState.catalog.refreshTable(tableIdentWithDB)
     }
 
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f429232..f694a0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -208,16 +208,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
 
 /**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+  private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
+    val table = r.tableMeta
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
     val cache = sparkSession.sessionState.catalog.tableRelationCache
     val withHiveSupport =
       sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
 
-    cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+    val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> _)
         val dataSource =
@@ -233,19 +234,25 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
             // TODO: improve `InMemoryCatalog` and remove this limitation.
             catalogTable = if (withHiveSupport) Some(table) else None)
 
-        LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
+        LogicalRelation(
+          dataSource.resolveRelation(checkFilesExist = false),
           catalogTable = Some(table))
       }
-    })
+    }).asInstanceOf[LogicalRelation]
+
+    // It's possible that the table schema is empty and need to be inferred at runtime. We should
+    // not specify expected outputs for this case.
+    val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
+    plan.copy(expectedOutputAttributes = expectedOutputs)
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
-        if DDLUtils.isDatasourceTable(s.metadata) =>
-      i.copy(table = readDataSourceTable(s.metadata))
+    case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
+        if DDLUtils.isDatasourceTable(r.tableMeta) =>
+      i.copy(table = readDataSourceTable(r))
 
-    case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-      readDataSourceTable(s.metadata)
+    case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
+      readDataSourceTable(r)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index e7a59d4..4d781b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -379,7 +379,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
     case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
       table match {
         case relation: CatalogRelation =>
-          val metadata = relation.catalogTable
+          val metadata = relation.tableMeta
           preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
         case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
           val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b38bbd8..bbb31db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -306,7 +306,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     // Analyze only one column.
     sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
     val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
-      case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
+      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
       case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
     }.head
     val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 677da0d..151a69a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import java.net.URI
+
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
@@ -26,6 +28,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -71,10 +74,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   private def getCached(
       tableIdentifier: QualifiedTableName,
       pathsInMetastore: Seq[Path],
-      metastoreRelation: MetastoreRelation,
       schemaInMetastore: StructType,
       expectedFileFormat: Class[_ <: FileFormat],
-      expectedBucketSpec: Option[BucketSpec],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
     tableRelationCache.getIfPresent(tableIdentifier) match {
@@ -89,7 +90,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             val useCached =
               relation.location.rootPaths.toSet == pathsInMetastore.toSet &&
                 logical.schema.sameType(schemaInMetastore) &&
-                relation.bucketSpec == expectedBucketSpec &&
+                // We don't support hive bucketed tables. This function `getCached` is only used for
+                // converting supported Hive tables to data source tables.
+                relation.bucketSpec.isEmpty &&
                 relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil))
 
             if (useCached) {
@@ -100,52 +103,48 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               None
             }
           case _ =>
-            logWarning(
-              s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " +
-                s"should be stored as $expectedFileFormat. However, we are getting " +
-                s"a ${relation.fileFormat} from the metastore cache. This cached " +
-                s"entry will be invalidated.")
+            logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
+              s"However, we are getting a ${relation.fileFormat} from the metastore cache. " +
+              "This cached entry will be invalidated.")
             tableRelationCache.invalidate(tableIdentifier)
             None
         }
       case other =>
-        logWarning(
-          s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
-            s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
-            s"This cached entry will be invalidated.")
+        logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
+          s"However, we are getting a $other from the metastore cache. " +
+          "This cached entry will be invalidated.")
         tableRelationCache.invalidate(tableIdentifier)
         None
     }
   }
 
   private def convertToLogicalRelation(
-      metastoreRelation: MetastoreRelation,
+      relation: CatalogRelation,
       options: Map[String, String],
-      defaultSource: FileFormat,
       fileFormatClass: Class[_ <: FileFormat],
       fileType: String): LogicalRelation = {
-    val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+    val metastoreSchema = relation.tableMeta.schema
     val tableIdentifier =
-      QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
-    val bucketSpec = None  // We don't support hive bucketed tables, only ones we write out.
+      QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table)
 
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
-    val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
-      val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
-
+    val tablePath = new Path(new URI(relation.tableMeta.location))
+    val result = if (relation.isPartitioned) {
+      val partitionSchema = relation.tableMeta.partitionSchema
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
-        Seq(metastoreRelation.hiveQlTable.getDataLocation)
+        Seq(tablePath)
       } else {
         // By convention (for example, see CatalogFileIndex), the definition of a
         // partitioned table's paths depends on whether that table has any actual partitions.
         // Partitioned tables without partitions use the location of the table's base path.
         // Partitioned tables with partitions use the locations of those partitions' data
         // locations,_omitting_ the table's base path.
-        val paths = metastoreRelation.getHiveQlPartitions().map { p =>
-          new Path(p.getLocation)
-        }
+        val paths = sparkSession.sharedState.externalCatalog
+          .listPartitions(tableIdentifier.database, tableIdentifier.name)
+          .map(p => new Path(new URI(p.storage.locationUri.get)))
+
         if (paths.isEmpty) {
-          Seq(metastoreRelation.hiveQlTable.getDataLocation)
+          Seq(tablePath)
         } else {
           paths
         }
@@ -155,39 +154,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         val cached = getCached(
           tableIdentifier,
           rootPaths,
-          metastoreRelation,
           metastoreSchema,
           fileFormatClass,
-          bucketSpec,
           Some(partitionSchema))
 
         val logicalRelation = cached.getOrElse {
-          val sizeInBytes =
-            metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
+          val sizeInBytes = relation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
           val fileIndex = {
-            val index = new CatalogFileIndex(
-              sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+            val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes)
             if (lazyPruningEnabled) {
               index
             } else {
               index.filterPartitions(Nil)  // materialize all the partitions in memory
             }
           }
-          val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
-          val dataSchema =
-            StructType(metastoreSchema
-              .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
 
-          val relation = HadoopFsRelation(
+          val fsRelation = HadoopFsRelation(
             location = fileIndex,
             partitionSchema = partitionSchema,
-            dataSchema = dataSchema,
-            bucketSpec = bucketSpec,
-            fileFormat = defaultSource,
+            dataSchema = relation.tableMeta.dataSchema,
+            // We don't support hive bucketed tables, only ones we write out.
+            bucketSpec = None,
+            fileFormat = fileFormatClass.newInstance(),
             options = options)(sparkSession = sparkSession)
 
-          val created = LogicalRelation(relation,
-            catalogTable = Some(metastoreRelation.catalogTable))
+          val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta))
           tableRelationCache.put(tableIdentifier, created)
           created
         }
@@ -195,14 +186,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     } else {
-      val rootPath = metastoreRelation.hiveQlTable.getDataLocation
+      val rootPath = tablePath
       withTableCreationLock(tableIdentifier, {
-        val cached = getCached(tableIdentifier,
+        val cached = getCached(
+          tableIdentifier,
           Seq(rootPath),
-          metastoreRelation,
           metastoreSchema,
           fileFormatClass,
-          bucketSpec,
           None)
         val logicalRelation = cached.getOrElse {
           val created =
@@ -210,11 +200,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               DataSource(
                 sparkSession = sparkSession,
                 paths = rootPath.toString :: Nil,
-                userSpecifiedSchema = Some(metastoreRelation.schema),
-                bucketSpec = bucketSpec,
+                userSpecifiedSchema = Some(metastoreSchema),
+                // We don't support hive bucketed tables, only ones we write out.
+                bucketSpec = None,
                 options = options,
                 className = fileType).resolveRelation(),
-              catalogTable = Some(metastoreRelation.catalogTable))
+              catalogTable = Some(relation.tableMeta))
 
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -223,7 +214,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
+    result.copy(expectedOutputAttributes = Some(relation.output))
   }
 
   /**
@@ -231,33 +222,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
    * data source relations for better performance.
    */
   object ParquetConversions extends Rule[LogicalPlan] {
-    private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = {
-      relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") &&
+    private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
+      relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
         sessionState.convertMetastoreParquet
     }
 
-    private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = {
-      val defaultSource = new ParquetFileFormat()
+    private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
       val fileFormatClass = classOf[ParquetFileFormat]
-
       val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
       val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
 
-      convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet")
+      convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
     }
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
       plan transformUp {
         // Write path
-        case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+        case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
             // Inserting into partitioned table is not supported in Parquet data source (yet).
-            if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+            if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+              !r.isPartitioned && shouldConvertMetastoreParquet(r) =>
           InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
 
         // Read path
-        case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
-          val parquetRelation = convertToParquetRelation(relation)
-          SubqueryAlias(relation.tableName, parquetRelation, None)
+        case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
+            shouldConvertMetastoreParquet(relation) =>
+          convertToParquetRelation(relation)
       }
     }
   }
@@ -267,31 +257,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
    * for better performance.
    */
   object OrcConversions extends Rule[LogicalPlan] {
-    private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = {
-      relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") &&
+    private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
+      relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
         sessionState.convertMetastoreOrc
     }
 
-    private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = {
-      val defaultSource = new OrcFileFormat()
+    private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
       val fileFormatClass = classOf[OrcFileFormat]
       val options = Map[String, String]()
 
-      convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc")
+      convertToLogicalRelation(relation, options, fileFormatClass, "orc")
     }
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
       plan transformUp {
         // Write path
-        case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+        case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
             // Inserting into partitioned table is not supported in Orc data source (yet).
-            if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+            if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
+              !r.isPartitioned && shouldConvertMetastoreOrc(r) =>
           InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
 
         // Read path
-        case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
-          val orcRelation = convertToOrcRelation(relation)
-          SubqueryAlias(relation.tableName, orcRelation, None)
+        case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
+            shouldConvertMetastoreOrc(relation) =>
+          convertToOrcRelation(relation)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 273cf85..5a08a6b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,10 +62,10 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       override val extendedResolutionRules =
         new ResolveHiveSerdeTable(sparkSession) ::
         new FindDataSourceTable(sparkSession) ::
-        new FindHiveSerdeTable(sparkSession) ::
         new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
+        new DetermineTableStats(sparkSession) ::
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         PreprocessTableCreation(sparkSession) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index f45532c..624cfa2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,8 +17,14 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
+import java.net.URI
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.StatsSetupConst
+
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -91,18 +97,56 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
 
       // Infers the schema, if empty, because the schema could be determined by Hive
       // serde.
-      val catalogTable = if (query.isEmpty) {
-        val withSchema = HiveUtils.inferSchema(withStorage)
-        if (withSchema.schema.length <= 0) {
+      val withSchema = if (query.isEmpty) {
+        val inferred = HiveUtils.inferSchema(withStorage)
+        if (inferred.schema.length <= 0) {
           throw new AnalysisException("Unable to infer the schema. " +
-            s"The schema specification is required to create the table ${withSchema.identifier}.")
+            s"The schema specification is required to create the table ${inferred.identifier}.")
         }
-        withSchema
+        inferred
       } else {
         withStorage
       }
 
-      c.copy(tableDesc = catalogTable)
+      c.copy(tableDesc = withSchema)
+  }
+}
+
+class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case relation: CatalogRelation
+        if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
+      val table = relation.tableMeta
+      // TODO: check if this estimate is valid for tables after partition pruning.
+      // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
+      // relatively cheap if parameters for the table are populated into the metastore.
+      // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
+      // (see StatsSetupConst in Hive) that we can look at in the future.
+      // When table is external,`totalSize` is always zero, which will influence join strategy
+      // so when `totalSize` is zero, use `rawDataSize` instead.
+      val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+      val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
+      val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) {
+        totalSize.get
+      } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
+        rawDataSize.get
+      } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
+        try {
+          val hadoopConf = session.sessionState.newHadoopConf()
+          val tablePath = new Path(new URI(table.location))
+          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+          fs.getContentSummary(tablePath).getLength
+        } catch {
+          case e: IOException =>
+            logWarning("Failed to get table size from hdfs.", e)
+            session.sessionState.conf.defaultSizeInBytes
+        }
+      } else {
+        session.sessionState.conf.defaultSizeInBytes
+      }
+
+      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
+      relation.copy(tableMeta = withStats)
   }
 }
 
@@ -114,8 +158,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
-      InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+    case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists)
+        if DDLUtils.isHiveTable(relation.tableMeta) =>
+      InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists)
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
       CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
@@ -125,21 +170,6 @@ object HiveAnalysis extends Rule[LogicalPlan] {
   }
 }
 
-/**
- * Replaces `SimpleCatalogRelation` with [[MetastoreRelation]] if its table provider is hive.
- */
-class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
-        if DDLUtils.isHiveTable(s.metadata) =>
-      i.copy(table =
-        MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session))
-
-    case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
-      MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)
-  }
-}
-
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>
@@ -161,10 +191,10 @@ private[hive] trait HiveStrategies {
    */
   object HiveTableScans extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) =>
+      case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
         // Filter out all predicates that only deal with partition keys, these are given to the
         // hive table scan operator to be used for partition pruning.
-        val partitionKeyIds = AttributeSet(relation.partitionKeys)
+        val partitionKeyIds = AttributeSet(relation.partitionCols)
         val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
           !predicate.references.isEmpty &&
           predicate.references.subsetOf(partitionKeyIds)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
deleted file mode 100644
index 97b1207..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
-import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.hive.client.HiveClientImpl
-import org.apache.spark.sql.types.StructField
-
-
-private[hive] case class MetastoreRelation(
-    databaseName: String,
-    tableName: String)
-    (val catalogTable: CatalogTable,
-     @transient private val sparkSession: SparkSession)
-  extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
-
-  override def equals(other: Any): Boolean = other match {
-    case relation: MetastoreRelation =>
-      databaseName == relation.databaseName &&
-        tableName == relation.tableName &&
-        output == relation.output
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    Objects.hashCode(databaseName, tableName, output)
-  }
-
-  override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
-
-  @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable)
-
-  @transient override def computeStats(conf: CatalystConf): Statistics = {
-    catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
-      sizeInBytes = {
-        val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
-        val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
-        // TODO: check if this estimate is valid for tables after partition pruning.
-        // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
-        // relatively cheap if parameters for the table are populated into the metastore.
-        // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
-        // (see StatsSetupConst in Hive) that we can look at in the future.
-        BigInt(
-          // When table is external,`totalSize` is always zero, which will influence join strategy
-          // so when `totalSize` is zero, use `rawDataSize` instead
-          // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
-          // which is generated by analyze command.
-          if (totalSize != null && totalSize.toLong > 0L) {
-            totalSize.toLong
-          } else if (rawDataSize != null && rawDataSize.toLong > 0) {
-            rawDataSize.toLong
-          } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-            try {
-              val hadoopConf = sparkSession.sessionState.newHadoopConf()
-              val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
-              fs.getContentSummary(hiveQlTable.getPath).getLength
-            } catch {
-              case e: IOException =>
-                logWarning("Failed to get table size from hdfs.", e)
-                sparkSession.sessionState.conf.defaultSizeInBytes
-            }
-          } else {
-            sparkSession.sessionState.conf.defaultSizeInBytes
-          })
-      }
-    ))
-  }
-
-  // When metastore partition pruning is turned off, we cache the list of all partitions to
-  // mimic the behavior of Spark < 1.5
-  private lazy val allPartitions: Seq[CatalogTablePartition] = {
-    sparkSession.sharedState.externalCatalog.listPartitions(
-      catalogTable.database,
-      catalogTable.identifier.table)
-  }
-
-  def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
-    val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
-      sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
-        catalogTable.database,
-        catalogTable.identifier.table,
-        predicates)
-    } else {
-      allPartitions
-    }
-
-    rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
-  }
-
-  /** Only compare database and tablename, not alias. */
-  override def sameResult(plan: LogicalPlan): Boolean = {
-    plan.canonicalized match {
-      case mr: MetastoreRelation =>
-        mr.databaseName == databaseName && mr.tableName == tableName
-      case _ => false
-    }
-  }
-
-  val tableDesc = new TableDesc(
-    hiveQlTable.getInputFormatClass,
-    // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
-    // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
-    // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
-    // HiveSequenceFileOutputFormat.
-    hiveQlTable.getOutputFormatClass,
-    hiveQlTable.getMetadata
-  )
-
-  implicit class SchemaAttribute(f: StructField) {
-    def toAttribute: AttributeReference = AttributeReference(
-      f.name,
-      f.dataType,
-      // Since data can be dumped in randomly with no validation, everything is nullable.
-      nullable = true
-    )(qualifier = Some(tableName))
-  }
-
-  /** PartitionKey attributes */
-  val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
-
-  /** Non-partitionKey attributes */
-  val dataColKeys = catalogTable.schema
-    .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
-    .map(_.toAttribute)
-
-  val output = dataColKeys ++ partitionKeys
-
-  /** An attribute map that can be used to lookup original attributes based on expression id. */
-  val attributeMap = AttributeMap(output.map(o => (o, o)))
-
-  /** An attribute map for determining the ordinal for non-partition columns. */
-  val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex)
-
-  override def inputFiles: Array[String] = {
-    val partLocations = allPartitions
-      .flatMap(_.storage.locationUri)
-      .toArray
-    if (partLocations.nonEmpty) {
-      partLocations
-    } else {
-      Array(
-        catalogTable.storage.locationUri.getOrElse(
-          sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
-    }
-  }
-
-  override def newInstance(): MetastoreRelation = {
-    MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d48702b..16c1103 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -61,7 +61,8 @@ private[hive] sealed trait TableReader {
 private[hive]
 class HadoopTableReader(
     @transient private val attributes: Seq[Attribute],
-    @transient private val relation: MetastoreRelation,
+    @transient private val partitionKeys: Seq[Attribute],
+    @transient private val tableDesc: TableDesc,
     @transient private val sparkSession: SparkSession,
     hadoopConf: Configuration)
   extends TableReader with Logging {
@@ -88,7 +89,7 @@ class HadoopTableReader(
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
       hiveTable,
-      Utils.classForName(relation.tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
+      Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
       filterOpt = None)
 
   /**
@@ -110,7 +111,7 @@ class HadoopTableReader(
 
     // Create local references to member variables, so that the entire `this` object won't be
     // serialized in the closure below.
-    val tableDesc = relation.tableDesc
+    val localTableDesc = tableDesc
     val broadcastedHadoopConf = _broadcastedHadoopConf
 
     val tablePath = hiveTable.getPath
@@ -119,7 +120,7 @@ class HadoopTableReader(
     // logDebug("Table input: %s".format(tablePath))
     val ifc = hiveTable.getInputFormatClass
       .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
-    val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
+    val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
 
     val attrsWithIndex = attributes.zipWithIndex
     val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -127,7 +128,7 @@ class HadoopTableReader(
     val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
       val hconf = broadcastedHadoopConf.value.value
       val deserializer = deserializerClass.newInstance()
-      deserializer.initialize(hconf, tableDesc.getProperties)
+      deserializer.initialize(hconf, localTableDesc.getProperties)
       HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
     }
 
@@ -212,8 +213,6 @@ class HadoopTableReader(
         partCols.map(col => new String(partSpec.get(col))).toArray
       }
 
-      // Create local references so that the outer object isn't serialized.
-      val tableDesc = relation.tableDesc
       val broadcastedHiveConf = _broadcastedHadoopConf
       val localDeserializer = partDeserializer
       val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@@ -222,12 +221,12 @@ class HadoopTableReader(
       // Attached indices indicate the position of each attribute in the output schema.
       val (partitionKeyAttrs, nonPartitionKeyAttrs) =
         attributes.zipWithIndex.partition { case (attr, _) =>
-          relation.partitionKeys.contains(attr)
+          partitionKeys.contains(attr)
         }
 
       def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
         partitionKeyAttrs.foreach { case (attr, ordinal) =>
-          val partOrdinal = relation.partitionKeys.indexOf(attr)
+          val partOrdinal = partitionKeys.indexOf(attr)
           row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
         }
       }
@@ -235,9 +234,11 @@ class HadoopTableReader(
       // Fill all partition keys to the given MutableRow object
       fillPartitionKeys(partValues, mutableRow)
 
-      val tableProperties = relation.tableDesc.getProperties
+      val tableProperties = tableDesc.getProperties
 
-      createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
+      // Create local references so that the outer object isn't serialized.
+      val localTableDesc = tableDesc
+      createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter =>
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.newInstance()
         // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
@@ -251,8 +252,8 @@ class HadoopTableReader(
         }
         deserializer.initialize(hconf, props)
         // get the table deserializer
-        val tableSerDe = tableDesc.getDeserializerClass.newInstance()
-        tableSerDe.initialize(hconf, tableDesc.getProperties)
+        val tableSerDe = localTableDesc.getDeserializerClass.newInstance()
+        tableSerDe.initialize(hconf, localTableDesc.getProperties)
 
         // fill the non partition key attributes
         HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 140c352..14b9565 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
@@ -29,10 +30,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.types.{BooleanType, DataType}
 import org.apache.spark.util.Utils
 
@@ -46,12 +49,12 @@ import org.apache.spark.util.Utils
 private[hive]
 case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
-    relation: MetastoreRelation,
+    relation: CatalogRelation,
     partitionPruningPred: Seq[Expression])(
     @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
 
-  require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
+  require(partitionPruningPred.isEmpty || relation.isPartitioned,
     "Partition pruning predicates only supported for partitioned tables.")
 
   override lazy val metrics = Map(
@@ -60,42 +63,54 @@ case class HiveTableScanExec(
   override def producedAttributes: AttributeSet = outputSet ++
     AttributeSet(partitionPruningPred.flatMap(_.references))
 
-  // Retrieve the original attributes based on expression ID so that capitalization matches.
-  val attributes = requestedAttributes.map(relation.attributeMap)
+  private val originalAttributes = AttributeMap(relation.output.map(a => a -> a))
+
+  override val output: Seq[Attribute] = {
+    // Retrieve the original attributes based on expression ID so that capitalization matches.
+    requestedAttributes.map(originalAttributes)
+  }
 
   // Bind all partition key attribute references in the partition pruning predicate for later
   // evaluation.
-  private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
+  private val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
     require(
       pred.dataType == BooleanType,
       s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
 
-    BindReferences.bindReference(pred, relation.partitionKeys)
+    BindReferences.bindReference(pred, relation.partitionCols)
   }
 
   // Create a local copy of hadoopConf,so that scan specific modifications should not impact
   // other queries
-  @transient
-  private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
+  @transient private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
+  @transient private val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta)
+  @transient private val tableDesc = new TableDesc(
+    hiveQlTable.getInputFormatClass,
+    hiveQlTable.getOutputFormatClass,
+    hiveQlTable.getMetadata)
 
   // append columns ids and names before broadcast
   addColumnMetadataToConf(hadoopConf)
 
-  @transient
-  private[this] val hadoopReader =
-    new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
+  @transient private val hadoopReader = new HadoopTableReader(
+    output,
+    relation.partitionCols,
+    tableDesc,
+    sparkSession,
+    hadoopConf)
 
-  private[this] def castFromString(value: String, dataType: DataType) = {
+  private def castFromString(value: String, dataType: DataType) = {
     Cast(Literal(value), dataType).eval(null)
   }
 
   private def addColumnMetadataToConf(hiveConf: Configuration) {
     // Specifies needed column IDs for those non-partitioning columns.
-    val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
+    val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex)
+    val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer)
 
-    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name))
+    HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name))
 
-    val tableDesc = relation.tableDesc
     val deserializer = tableDesc.getDeserializerClass.newInstance
     deserializer.initialize(hiveConf, tableDesc.getProperties)
 
@@ -113,7 +128,7 @@ case class HiveTableScanExec(
       .mkString(",")
 
     hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
-    hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(","))
+    hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(","))
   }
 
   /**
@@ -126,7 +141,7 @@ case class HiveTableScanExec(
     boundPruningPred match {
       case None => partitions
       case Some(shouldKeep) => partitions.filter { part =>
-        val dataTypes = relation.partitionKeys.map(_.dataType)
+        val dataTypes = relation.partitionCols.map(_.dataType)
         val castedValues = part.getValues.asScala.zip(dataTypes)
           .map { case (value, dataType) => castFromString(value, dataType) }
 
@@ -138,27 +153,35 @@ case class HiveTableScanExec(
     }
   }
 
+  // exposed for tests
+  @transient lazy val rawPartitions = {
+    val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
+      // Retrieve the original attributes based on expression ID so that capitalization matches.
+      val normalizedFilters = partitionPruningPred.map(_.transform {
+        case a: AttributeReference => originalAttributes(a)
+      })
+      sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
+        relation.tableMeta.database,
+        relation.tableMeta.identifier.table,
+        normalizedFilters)
+    } else {
+      sparkSession.sharedState.externalCatalog.listPartitions(
+        relation.tableMeta.database,
+        relation.tableMeta.identifier.table)
+    }
+    prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable))
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
     // Using dummyCallSite, as getCallSite can turn out to be expensive with
     // with multiple partitions.
-    val rdd = if (!relation.hiveQlTable.isPartitioned) {
+    val rdd = if (!relation.isPartitioned) {
       Utils.withDummyCallSite(sqlContext.sparkContext) {
-        hadoopReader.makeRDDForTable(relation.hiveQlTable)
+        hadoopReader.makeRDDForTable(hiveQlTable)
       }
     } else {
-      // The attribute name of predicate could be different than the one in schema in case of
-      // case insensitive, we should change them to match the one in schema, so we do not need to
-      // worry about case sensitivity anymore.
-      val normalizedFilters = partitionPruningPred.map { e =>
-        e transform {
-          case a: AttributeReference =>
-            a.withName(relation.output.find(_.semanticEquals(a)).get.name)
-        }
-      }
-
       Utils.withDummyCallSite(sqlContext.sparkContext) {
-        hadoopReader.makeRDDForPartitionedTable(
-          prunePartitions(relation.getHiveQlPartitions(normalizedFilters)))
+        hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions))
       }
     }
     val numOutputRows = longMetric("numOutputRows")
@@ -174,8 +197,6 @@ case class HiveTableScanExec(
     }
   }
 
-  override def output: Seq[Attribute] = attributes
-
   override def sameResult(plan: SparkPlan): Boolean = plan match {
     case other: HiveTableScanExec =>
       val thisPredicates = partitionPruningPred.map(cleanExpression)

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 142f25d..f107149 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -29,16 +29,18 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.hadoop.hive.ql.ErrorMsg
+import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.HiveVersion
+import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
 import org.apache.spark.SparkException
 
 
@@ -52,9 +54,7 @@ import org.apache.spark.SparkException
  * In the future we should converge the write path for Hive with the normal data source write path,
  * as defined in `org.apache.spark.sql.execution.datasources.FileFormatWriter`.
  *
- * @param table the logical plan representing the table. In the future this should be a
- *              `org.apache.spark.sql.catalyst.catalog.CatalogTable` once we converge Hive tables
- *              and data source tables.
+ * @param table the metadata of the table.
  * @param partition a map from the partition key to the partition value (optional). If the partition
  *                  value is optional, dynamic partition insert will be performed.
  *                  As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
@@ -74,7 +74,7 @@ import org.apache.spark.SparkException
  * @param ifNotExists If true, only write if the table or partition does not exist.
  */
 case class InsertIntoHiveTable(
-    table: MetastoreRelation,
+    table: CatalogTable,
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
@@ -218,10 +218,19 @@ case class InsertIntoHiveTable(
     val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
     val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
+    val hiveQlTable = HiveClientImpl.toHiveTable(table)
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
     // instances within the closure, since Serializer is not serializable while TableDesc is.
-    val tableDesc = table.tableDesc
-    val tableLocation = table.hiveQlTable.getDataLocation
+    val tableDesc = new TableDesc(
+      hiveQlTable.getInputFormatClass,
+      // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
+      // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
+      // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
+      // HiveSequenceFileOutputFormat.
+      hiveQlTable.getOutputFormatClass,
+      hiveQlTable.getMetadata
+    )
+    val tableLocation = hiveQlTable.getDataLocation
     val tmpLocation =
       getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
@@ -254,9 +263,9 @@ case class InsertIntoHiveTable(
     // By this time, the partition map must match the table's partition columns
     if (partitionColumnNames.toSet != partition.keySet) {
       throw new SparkException(
-        s"""Requested partitioning does not match the ${table.tableName} table:
+        s"""Requested partitioning does not match the ${table.identifier.table} table:
            |Requested partitions: ${partition.keys.mkString(",")}
-           |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin)
+           |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin)
     }
 
     // Validate partition spec if there exist any dynamic partitions
@@ -307,8 +316,8 @@ case class InsertIntoHiveTable(
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
         externalCatalog.loadDynamicPartitions(
-          db = table.catalogTable.database,
-          table = table.catalogTable.identifier.table,
+          db = table.database,
+          table = table.identifier.table,
           tmpLocation.toString,
           partitionSpec,
           overwrite,
@@ -320,8 +329,8 @@ case class InsertIntoHiveTable(
         // scalastyle:on
         val oldPart =
           externalCatalog.getPartitionOption(
-            table.catalogTable.database,
-            table.catalogTable.identifier.table,
+            table.database,
+            table.identifier.table,
             partitionSpec)
 
         var doHiveOverwrite = overwrite
@@ -350,8 +359,8 @@ case class InsertIntoHiveTable(
           // which is currently considered as a Hive native command.
           val inheritTableSpecs = true
           externalCatalog.loadPartition(
-            table.catalogTable.database,
-            table.catalogTable.identifier.table,
+            table.database,
+            table.identifier.table,
             tmpLocation.toString,
             partitionSpec,
             isOverwrite = doHiveOverwrite,
@@ -361,8 +370,8 @@ case class InsertIntoHiveTable(
       }
     } else {
       externalCatalog.loadTable(
-        table.catalogTable.database,
-        table.catalogTable.identifier.table,
+        table.database,
+        table.identifier.table,
         tmpLocation.toString, // TODO: URI
         overwrite,
         isSrcLocal = false)
@@ -378,8 +387,8 @@ case class InsertIntoHiveTable(
     }
 
     // Invalidate the cache.
-    sparkSession.sharedState.cacheManager.invalidateCache(table)
-    sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier)
+    sparkSession.catalog.uncacheTable(table.qualifiedName)
+    sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
     // It would be nice to just return the childRdd unchanged so insert operations could be chained,
     // however for now we return an empty list to simplify compatibility checks with hive, which

http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
deleted file mode 100644
index 91ff711..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-
-class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
-  test("makeCopy and toJSON should work") {
-    val table = CatalogTable(
-      identifier = TableIdentifier("test", Some("db")),
-      tableType = CatalogTableType.VIEW,
-      storage = CatalogStorageFormat.empty,
-      schema = StructType(StructField("a", IntegerType, true) :: Nil))
-    val relation = MetastoreRelation("db", "test")(table, null)
-
-    // No exception should be thrown
-    relation.makeCopy(Array("db", "test"))
-    // No exception should be thrown
-    relation.toJSON
-  }
-
-  test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") {
-    withTable("bar") {
-      withTempView("foo") {
-        sql("select 0 as id").createOrReplaceTempView("foo")
-        // If we optimize the query in CTAS more than once, the following saveAsTable will fail
-        // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])`
-        sql("CREATE TABLE bar AS SELECT * FROM foo group by id")
-        checkAnswer(spark.table("bar"), Row(0) :: Nil)
-        val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
-        assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table")
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org