You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/16 16:36:38 UTC

spark git commit: [SPARK-18464][SQL][BACKPORT] support old table which doesn't store schema in table properties

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 f5ede0d55 -> 2a9697593


[SPARK-18464][SQL][BACKPORT] support old table which doesn't store schema in table properties

backport https://github.com/apache/spark/pull/18907 to branch 2.2

Author: Wenchen Fan <we...@databricks.com>

Closes #18963 from cloud-fan/backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a969759
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a969759
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a969759

Branch: refs/heads/branch-2.2
Commit: 2a9697593add425efa15d51afb501b6236a78e26
Parents: f5ede0d
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Aug 16 09:36:33 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Aug 16 09:36:33 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  7 +--
 .../spark/sql/catalyst/catalog/interface.scala  | 22 ++++++---
 .../catalyst/catalog/SessionCatalogSuite.scala  |  4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  9 ++--
 .../scala/org/apache/spark/sql/Dataset.scala    |  4 +-
 .../execution/OptimizeMetadataOnlyQuery.scala   |  6 +--
 .../datasources/DataSourceStrategy.scala        | 47 +++++++++++---------
 .../spark/sql/execution/datasources/rules.scala |  2 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |  4 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  | 16 +++----
 .../sql/hive/execution/HiveTableScanExec.scala  |  6 +--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  1 +
 .../apache/spark/sql/hive/StatisticsSuite.scala | 10 ++---
 .../sql/hive/execution/SQLQuerySuite.scala      | 10 ++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  4 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  4 +-
 17 files changed, 86 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8d9fb4c..df8f9aa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -679,12 +679,7 @@ class SessionCatalog(
             child = parser.parsePlan(viewText))
           SubqueryAlias(table, child)
         } else {
-          val tableRelation = CatalogRelation(
-            metadata,
-            // we assume all the columns are nullable.
-            metadata.dataSchema.asNullable.toAttributes,
-            metadata.partitionSchema.asNullable.toAttributes)
-          SubqueryAlias(table, tableRelation)
+          SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
         }
       } else {
         SubqueryAlias(table, tempTables(table))

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 976d787..5c8e570 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -397,11 +397,22 @@ object CatalogTypes {
   type TablePartitionSpec = Map[String, String]
 }
 
+/**
+ * A placeholder for a table relation, which will be replaced by concrete relation like
+ * `LogicalRelation` or `HiveTableRelation`, during analysis.
+ */
+case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
+  assert(tableMeta.identifier.database.isDefined)
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = Nil
+}
 
 /**
- * A [[LogicalPlan]] that represents a table.
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
  */
-case class CatalogRelation(
+case class HiveTableRelation(
     tableMeta: CatalogTable,
     dataCols: Seq[AttributeReference],
     partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -415,7 +426,7 @@ case class CatalogRelation(
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
   override def equals(relation: Any): Boolean = relation match {
-    case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
     case _ => false
   }
 
@@ -434,15 +445,12 @@ case class CatalogRelation(
   ))
 
   override def computeStats(conf: SQLConf): Statistics = {
-    // For data source tables, we will create a `LogicalRelation` and won't call this method, for
-    // hive serde tables, we will always generate a statistics.
-    // TODO: unify the table stats generation.
     tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
       throw new IllegalStateException("table stats must be specified.")
     }
   }
 
-  override def newInstance(): LogicalPlan = copy(
+  override def newInstance(): HiveTableRelation = copy(
     dataCols = dataCols.map(_.newInstance()),
     partitionCols = partitionCols.map(_.newInstance()))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 5ee729e..9c1b638 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -506,14 +506,14 @@ abstract class SessionCatalogSuite extends PlanTest {
       catalog.setCurrentDatabase("db2")
       // If we explicitly specify the database, we'll look up the relation in that database
       assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
       // Otherwise, we'll first look up a temporary table with the same name
       assert(catalog.lookupRelation(TableIdentifier("tbl1"))
         == SubqueryAlias("tbl1", tempTable1))
       // Then, if that does not exist, look up the relation in the current database
       catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
       assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b71c5eb..0259fff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
-            relation.tableMeta.identifier
+          case relation: HiveTableRelation => relation.tableMeta.identifier
         }
 
         val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           // check hive table relation when overwrite mode
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
-            && srcRelations.contains(relation.tableMeta.identifier) =>
+          case relation: HiveTableRelation
+              if srcRelations.contains(relation.tableMeta.identifier) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           case _ => // OK

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 00a5edf..a775fb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -2777,7 +2777,7 @@ class Dataset[T] private[sql](
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles
-      case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
+      case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
     }.flatten
     files.toSet.toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 3c046ce..d59b3c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -101,7 +101,7 @@ case class OptimizeMetadataOnlyQuery(
             val partitionData = fsRelation.location.listFiles(Nil, Nil)
             LocalRelation(partAttrs, partitionData.map(_.values))
 
-          case relation: CatalogRelation =>
+          case relation: HiveTableRelation =>
             val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
             val caseInsensitiveProperties =
               CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -137,7 +137,7 @@ case class OptimizeMetadataOnlyQuery(
         val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
         Some(AttributeSet(partAttrs), l)
 
-      case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+      case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
         val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
         Some(AttributeSet(partAttrs), relation)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ded9303..04ee081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
 
 
 /**
- * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
+ *
+ * TODO: we should remove the special handling for hive tables after completely making hive as a
+ * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
-    val table = r.tableMeta
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
-    val catalogProxy = sparkSession.sessionState.catalog
-
-    val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
+    val catalog = sparkSession.sessionState.catalog
+    catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
         val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
 
         LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
       }
-    }).asInstanceOf[LogicalRelation]
+    })
+  }
 
-    if (r.output.isEmpty) {
-      // It's possible that the table schema is empty and need to be inferred at runtime. For this
-      // case, we don't need to change the output of the cached plan.
-      plan
-    } else {
-      plan.copy(output = r.output)
-    }
+  private def readHiveTable(table: CatalogTable): LogicalPlan = {
+    HiveTableRelation(
+      table,
+      // Hive table columns are always nullable.
+      table.dataSchema.asNullable.toAttributes,
+      table.partitionSchema.asNullable.toAttributes)
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
-        if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      i.copy(table = readDataSourceTable(r))
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
+        if DDLUtils.isDatasourceTable(tableMeta) =>
+      i.copy(table = readDataSourceTable(tableMeta))
+
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
+      i.copy(table = readHiveTable(tableMeta))
+
+    case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
+      readDataSourceTable(tableMeta)
 
-    case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      readDataSourceTable(r)
+    case UnresolvedCatalogRelation(tableMeta) =>
+      readHiveTable(tableMeta)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 45f2a41..9647f2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -376,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
       table match {
-        case relation: CatalogRelation =>
+        case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
           preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
         case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ae0f219..86d19af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -304,7 +304,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     // Analyze only one column.
     sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
     val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
-      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+      case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
       case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
     }.head
     val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9b3cbb6..e1fee9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   def convertToLogicalRelation(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormatClass: Class[_ <: FileFormat],
       fileType: String): LogicalRelation = {
@@ -212,7 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    // The inferred schema may have different filed names as the table schema, we should respect
+    // The inferred schema may have different field names as the table schema, we should respect
     // it, but also respect the exprId in table relation output.
     assert(result.output.length == relation.output.length &&
       result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -223,7 +223,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   private def inferIfNeeded(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormat: FileFormat,
       fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4f090d5..53e500e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -116,7 +116,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case relation: CatalogRelation
+    case relation: HiveTableRelation
         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       // TODO: check if this estimate is valid for tables after partition pruning.
@@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
+    case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
 
@@ -184,13 +184,13 @@ object HiveAnalysis extends Rule[LogicalPlan] {
 case class RelationConversions(
     conf: SQLConf,
     sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
-  private def isConvertible(relation: CatalogRelation): Boolean = {
+  private def isConvertible(relation: HiveTableRelation): Boolean = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
       serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
-  private def convert(relation: CatalogRelation): LogicalRelation = {
+  private def convert(relation: HiveTableRelation): LogicalRelation = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     if (serde.contains("parquet")) {
       val options = Map(ParquetOptions.MERGE_SCHEMA ->
@@ -207,14 +207,14 @@ case class RelationConversions(
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       // Write path
-      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
+      case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
         // Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             !r.isPartitioned && isConvertible(r) =>
         InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
 
       // Read path
-      case relation: CatalogRelation
+      case relation: HiveTableRelation
           if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
         convert(relation)
     }
@@ -242,7 +242,7 @@ private[hive] trait HiveStrategies {
    */
   object HiveTableScans extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
+      case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
         // Filter out all predicates that only deal with partition keys, these are given to the
         // hive table scan operator to be used for partition pruning.
         val partitionKeyIds = AttributeSet(relation.partitionCols)

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index e191071..75b076b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution._
@@ -50,7 +50,7 @@ import org.apache.spark.util.Utils
 private[hive]
 case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
-    relation: CatalogRelation,
+    relation: HiveTableRelation,
     partitionPruningPred: Seq[Expression])(
     @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
@@ -205,7 +205,7 @@ case class HiveTableScanExec(
     val input: AttributeSeq = relation.output
     HiveTableScanExec(
       requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
-      relation.canonicalized.asInstanceOf[CatalogRelation],
+      relation.canonicalized.asInstanceOf[HiveTableRelation],
       QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b554694..06a30b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1377,6 +1377,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         hiveClient.createTable(tableDesc, ignoreIfExists = false)
 
         checkAnswer(spark.table("old"), Row(1, "a"))
+        checkAnswer(sql("select * from old"), Row(1, "a"))
 
         val expectedSchema = StructType(Seq(
           StructField("i", IntegerType, nullable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 819180d..a9caad8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
@@ -60,7 +60,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
                |LOCATION '${tempDir.toURI}'""".stripMargin)
 
           val relation = spark.table("csv_table").queryExecution.analyzed.children.head
-            .asInstanceOf[CatalogRelation]
+            .asInstanceOf[HiveTableRelation]
 
           val properties = relation.tableMeta.properties
           assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
@@ -497,7 +497,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   test("estimates the size of a test Hive serde tables") {
     val df = sql("""SELECT * FROM src""")
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats(conf).sizeInBytes
+      case relation: HiveTableRelation => relation.stats(conf).sizeInBytes
     }
     assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
     assert(sizes(0).equals(BigInt(5812)),
@@ -557,7 +557,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       () => (),
       metastoreQuery,
       metastoreAnswer,
-      implicitly[ClassTag[CatalogRelation]]
+      implicitly[ClassTag[HiveTableRelation]]
     )
   }
 
@@ -571,7 +571,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
     // Assert src has a size smaller than the threshold.
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats(conf).sizeInBytes
+      case relation: HiveTableRelation => relation.stats(conf).sizeInBytes
     }
     assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
       && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 002ddd4..dc79bfa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TestUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       case LogicalRelation(r: HadoopFsRelation, _, _) =>
         if (!isDataSourceTable) {
           fail(
-            s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " +
+            s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
         }
         userSpecifiedLocation match {
@@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         assert(catalogTable.provider.get === format)
 
-      case r: CatalogRelation =>
+      case r: HiveTableRelation =>
         if (isDataSourceTable) {
           fail(
             s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
-              s"${classOf[CatalogRelation].getCanonicalName}.")
+              s"${classOf[HiveTableRelation].getCanonicalName}.")
         }
         userSpecifiedLocation match {
           case Some(location) =>
@@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case SubqueryAlias(_, r: CatalogRelation) => // OK
+        case SubqueryAlias(_, r: HiveTableRelation) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8c85573..60ccd99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHive._
@@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
                 }
               } else {
                 queryExecution.analyzed.collectFirst {
-                  case _: CatalogRelation => ()
+                  case _: HiveTableRelation => ()
                 }.getOrElse {
                   fail(s"Expecting no conversion from orc to data sources, " +
                     s"but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 23f21e6..303884d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
@@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
               }
             } else {
               queryExecution.analyzed.collectFirst {
-                case _: CatalogRelation =>
+                case _: HiveTableRelation =>
               }.getOrElse {
                 fail(s"Expecting no conversion from parquet to data sources, " +
                   s"but got:\n$queryExecution")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org