You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/15 16:05:00 UTC

spark git commit: [SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties

Repository: spark
Updated Branches:
  refs/heads/master bc9902587 -> 14bdb25fd


[SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties

## What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/15900 , to fix one more bug:
When table schema is empty and need to be inferred at runtime, we should not resolve parent plans before the schema has been inferred, or the parent plans will be resolved against an empty schema and may get wrong result for something like `select *`

The fix logic is: introduce `UnresolvedCatalogRelation` as a placeholder. Then we replace it with `LogicalRelation` or `HiveTableRelation` during analysis, so that it's guaranteed that we won't resolve parent plans until the schema has been inferred.

## How was this patch tested?

regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #18907 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14bdb25f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14bdb25f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14bdb25f

Branch: refs/heads/master
Commit: 14bdb25fd76cf1da8f590231833f47b5b7059f11
Parents: bc99025
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Aug 15 09:04:56 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Aug 15 09:04:56 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  7 +--
 .../spark/sql/catalyst/catalog/interface.scala  | 25 ++++++----
 .../catalyst/catalog/SessionCatalogSuite.scala  |  5 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  | 11 ++---
 .../scala/org/apache/spark/sql/Dataset.scala    |  4 +-
 .../execution/OptimizeMetadataOnlyQuery.scala   |  6 +--
 .../datasources/DataSourceStrategy.scala        | 49 +++++++++++---------
 .../spark/sql/execution/datasources/rules.scala |  4 +-
 .../sql/StatisticsCollectionTestBase.scala      |  4 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  | 17 ++++---
 .../sql/hive/execution/HiveTableScanExec.scala  |  6 +--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  1 +
 .../apache/spark/sql/hive/StatisticsSuite.scala | 10 ++--
 .../sql/hive/execution/SQLQuerySuite.scala      | 10 ++--
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  4 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  4 +-
 17 files changed, 90 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e3237a8..6030d90 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -678,12 +678,7 @@ class SessionCatalog(
             child = parser.parsePlan(viewText))
           SubqueryAlias(table, child)
         } else {
-          val tableRelation = CatalogRelation(
-            metadata,
-            // we assume all the columns are nullable.
-            metadata.dataSchema.asNullable.toAttributes,
-            metadata.partitionSchema.asNullable.toAttributes)
-          SubqueryAlias(table, tableRelation)
+          SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
         }
       } else {
         SubqueryAlias(table, tempTables(table))

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index f865106..5a8c4e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -405,11 +404,22 @@ object CatalogTypes {
   type TablePartitionSpec = Map[String, String]
 }
 
+/**
+ * A placeholder for a table relation, which will be replaced by concrete relation like
+ * `LogicalRelation` or `HiveTableRelation`, during analysis.
+ */
+case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
+  assert(tableMeta.identifier.database.isDefined)
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = Nil
+}
 
 /**
- * A [[LogicalPlan]] that represents a table.
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
  */
-case class CatalogRelation(
+case class HiveTableRelation(
     tableMeta: CatalogTable,
     dataCols: Seq[AttributeReference],
     partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -423,7 +433,7 @@ case class CatalogRelation(
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
   override def equals(relation: Any): Boolean = relation match {
-    case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
     case _ => false
   }
 
@@ -431,7 +441,7 @@ case class CatalogRelation(
     Objects.hashCode(tableMeta.identifier, output)
   }
 
-  override lazy val canonicalized: LogicalPlan = copy(
+  override lazy val canonicalized: HiveTableRelation = copy(
     tableMeta = tableMeta.copy(
       storage = CatalogStorageFormat.empty,
       createTime = -1
@@ -444,15 +454,12 @@ case class CatalogRelation(
     })
 
   override def computeStats(): Statistics = {
-    // For data source tables, we will create a `LogicalRelation` and won't call this method, for
-    // hive serde tables, we will always generate a statistics.
-    // TODO: unify the table stats generation.
     tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
       throw new IllegalStateException("table stats must be specified.")
     }
   }
 
-  override def newInstance(): LogicalPlan = copy(
+  override def newInstance(): HiveTableRelation = copy(
     dataCols = dataCols.map(_.newInstance()),
     partitionCols = partitionCols.map(_.newInstance()))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index d2b670d..1cce199 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -518,14 +517,14 @@ abstract class SessionCatalogSuite extends AnalysisTest {
       catalog.setCurrentDatabase("db2")
       // If we explicitly specify the database, we'll look up the relation in that database
       assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
       // Otherwise, we'll first look up a temporary table with the same name
       assert(catalog.lookupRelation(TableIdentifier("tbl1"))
         == SubqueryAlias("tbl1", tempTable1))
       // Then, if that does not exist, look up the relation in the current database
       catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
       assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 65c9ef4..877051a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 
@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
-            relation.tableMeta.identifier
+          case relation: HiveTableRelation => relation.tableMeta.identifier
         }
 
         val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           // check hive table relation when overwrite mode
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
-            && srcRelations.contains(relation.tableMeta.identifier) =>
+          case relation: HiveTableRelation
+              if srcRelations.contains(relation.tableMeta.identifier) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           case _ => // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index aa968d8..a9887eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
@@ -2965,7 +2965,7 @@ class Dataset[T] private[sql](
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles
-      case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
+      case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
     }.flatten
     files.toSet.toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 02f45ab..301c4f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
             val partitionData = fsRelation.location.listFiles(Nil, Nil)
             LocalRelation(partAttrs, partitionData.map(_.values))
 
-          case relation: CatalogRelation =>
+          case relation: HiveTableRelation =>
             val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
             val caseInsensitiveProperties =
               CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
         val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
         Some((AttributeSet(partAttrs), l))
 
-      case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+      case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
         val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
         Some((AttributeSet(partAttrs), relation))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 587b9b4..2370177 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
 
 
 /**
- * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
+ *
+ * TODO: we should remove the special handling for hive tables after completely making hive as a
+ * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
-    val table = r.tableMeta
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
-    val catalogProxy = sparkSession.sessionState.catalog
-
-    val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
+    val catalog = sparkSession.sessionState.catalog
+    catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
         val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
 
         LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
       }
-    }).asInstanceOf[LogicalRelation]
+    })
+  }
 
-    if (r.output.isEmpty) {
-      // It's possible that the table schema is empty and need to be inferred at runtime. For this
-      // case, we don't need to change the output of the cached plan.
-      plan
-    } else {
-      plan.copy(output = r.output)
-    }
+  private def readHiveTable(table: CatalogTable): LogicalPlan = {
+    HiveTableRelation(
+      table,
+      // Hive table columns are always nullable.
+      table.dataSchema.asNullable.toAttributes,
+      table.partitionSchema.asNullable.toAttributes)
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
-        if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      i.copy(table = readDataSourceTable(r))
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
+        if DDLUtils.isDatasourceTable(tableMeta) =>
+      i.copy(table = readDataSourceTable(tableMeta))
+
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
+      i.copy(table = readHiveTable(tableMeta))
+
+    case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
+      readDataSourceTable(tableMeta)
 
-    case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      readDataSourceTable(r)
+    case UnresolvedCatalogRelation(tableMeta) =>
+      readHiveTable(tableMeta)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index cb8dc1e..84acca2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
       table match {
-        case relation: CatalogRelation =>
+        case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
           preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
         case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
@@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
 
   private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
     operator match {
-      case _: CatalogRelation => 1
+      case _: HiveTableRelation => 1
       case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
       case _: LeafNode => 0
       // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 4156976..5916cd7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     // Analyze only one column.
     sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
     val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
-      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+      case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
       case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
     }.head
     val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 808dc01..8bab059 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   def convertToLogicalRelation(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormatClass: Class[_ <: FileFormat],
       fileType: String): LogicalRelation = {
@@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    // The inferred schema may have different filed names as the table schema, we should respect
+    // The inferred schema may have different field names as the table schema, we should respect
     // it, but also respect the exprId in table relation output.
     assert(result.output.length == relation.output.length &&
       result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   private def inferIfNeeded(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormat: FileFormat,
       fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9c60d22..ae1e7e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -21,10 +21,9 @@ import java.io.IOException
 import java.util.Locale
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -116,7 +115,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case relation: CatalogRelation
+    case relation: HiveTableRelation
         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
@@ -147,7 +146,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
+    case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
 
@@ -171,13 +170,13 @@ object HiveAnalysis extends Rule[LogicalPlan] {
 case class RelationConversions(
     conf: SQLConf,
     sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
-  private def isConvertible(relation: CatalogRelation): Boolean = {
+  private def isConvertible(relation: HiveTableRelation): Boolean = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
       serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
-  private def convert(relation: CatalogRelation): LogicalRelation = {
+  private def convert(relation: HiveTableRelation): LogicalRelation = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     if (serde.contains("parquet")) {
       val options = Map(ParquetOptions.MERGE_SCHEMA ->
@@ -194,14 +193,14 @@ case class RelationConversions(
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       // Write path
-      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
+      case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
         // Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             !r.isPartitioned && isConvertible(r) =>
         InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
 
       // Read path
-      case relation: CatalogRelation
+      case relation: HiveTableRelation
           if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
         convert(relation)
     }
@@ -229,7 +228,7 @@ private[hive] trait HiveStrategies {
    */
   object HiveTableScans extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
+      case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
         // Filter out all predicates that only deal with partition keys, these are given to the
         // hive table scan operator to be used for partition pruning.
         val partitionKeyIds = AttributeSet(relation.partitionCols)

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index e191071..896f24f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution._
@@ -50,7 +50,7 @@ import org.apache.spark.util.Utils
 private[hive]
 case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
-    relation: CatalogRelation,
+    relation: HiveTableRelation,
     partitionPruningPred: Seq[Expression])(
     @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
@@ -205,7 +205,7 @@ case class HiveTableScanExec(
     val input: AttributeSeq = relation.output
     HiveTableScanExec(
       requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
-      relation.canonicalized.asInstanceOf[CatalogRelation],
+      relation.canonicalized,
       QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c785aca..e01198d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1372,6 +1372,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         hiveClient.createTable(tableDesc, ignoreIfExists = false)
 
         checkAnswer(spark.table("old"), Row(1, "a"))
+        checkAnswer(sql("select * from old"), Row(1, "a"))
 
         val expectedSchema = StructType(Seq(
           StructField("i", IntegerType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 36566bf..71cf79c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -72,7 +72,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
                |LOCATION '${tempDir.toURI}'""".stripMargin)
 
           val relation = spark.table("csv_table").queryExecution.analyzed.children.head
-            .asInstanceOf[CatalogRelation]
+            .asInstanceOf[HiveTableRelation]
 
           val properties = relation.tableMeta.ignoredProperties
           assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
@@ -905,7 +905,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   test("estimates the size of a test Hive serde tables") {
     val df = sql("""SELECT * FROM src""")
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats.sizeInBytes
+      case relation: HiveTableRelation => relation.stats.sizeInBytes
     }
     assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
     assert(sizes(0).equals(BigInt(5812)),
@@ -965,7 +965,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       () => (),
       metastoreQuery,
       metastoreAnswer,
-      implicitly[ClassTag[CatalogRelation]]
+      implicitly[ClassTag[HiveTableRelation]]
     )
   }
 
@@ -979,7 +979,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
     // Assert src has a size smaller than the threshold.
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats.sizeInBytes
+      case relation: HiveTableRelation => relation.stats.sizeInBytes
     }
     assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
       && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 45bbb0c..ef3d9b2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TestUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       case LogicalRelation(r: HadoopFsRelation, _, _) =>
         if (!isDataSourceTable) {
           fail(
-            s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " +
+            s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
         }
         userSpecifiedLocation match {
@@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         assert(catalogTable.provider.get === format)
 
-      case r: CatalogRelation =>
+      case r: HiveTableRelation =>
         if (isDataSourceTable) {
           fail(
             s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
-              s"${classOf[CatalogRelation].getCanonicalName}.")
+              s"${classOf[HiveTableRelation].getCanonicalName}.")
         }
         userSpecifiedLocation match {
           case Some(location) =>
@@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case SubqueryAlias(_, r: CatalogRelation) => // OK
+        case SubqueryAlias(_, r: HiveTableRelation) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8c85573..60ccd99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHive._
@@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
                 }
               } else {
                 queryExecution.analyzed.collectFirst {
-                  case _: CatalogRelation => ()
+                  case _: HiveTableRelation => ()
                 }.getOrElse {
                   fail(s"Expecting no conversion from orc to data sources, " +
                     s"but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 23f21e6..303884d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
@@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
               }
             } else {
               queryExecution.analyzed.collectFirst {
-                case _: CatalogRelation =>
+                case _: HiveTableRelation =>
               }.getOrElse {
                 fail(s"Expecting no conversion from parquet to data sources, " +
                   s"but got:\n$queryExecution")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org