You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/16 16:36:38 UTC
spark git commit: [SPARK-18464][SQL][BACKPORT] support old table
which doesn't store schema in table properties
Repository: spark
Updated Branches:
refs/heads/branch-2.2 f5ede0d55 -> 2a9697593
[SPARK-18464][SQL][BACKPORT] support old table which doesn't store schema in table properties
backport https://github.com/apache/spark/pull/18907 to branch 2.2
Author: Wenchen Fan <we...@databricks.com>
Closes #18963 from cloud-fan/backport.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a969759
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a969759
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a969759
Branch: refs/heads/branch-2.2
Commit: 2a9697593add425efa15d51afb501b6236a78e26
Parents: f5ede0d
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Aug 16 09:36:33 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Aug 16 09:36:33 2017 -0700
----------------------------------------------------------------------
.../sql/catalyst/catalog/SessionCatalog.scala | 7 +--
.../spark/sql/catalyst/catalog/interface.scala | 22 ++++++---
.../catalyst/catalog/SessionCatalogSuite.scala | 4 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 9 ++--
.../scala/org/apache/spark/sql/Dataset.scala | 4 +-
.../execution/OptimizeMetadataOnlyQuery.scala | 6 +--
.../datasources/DataSourceStrategy.scala | 47 +++++++++++---------
.../spark/sql/execution/datasources/rules.scala | 2 +-
.../spark/sql/StatisticsCollectionSuite.scala | 4 +-
.../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +--
.../apache/spark/sql/hive/HiveStrategies.scala | 16 +++----
.../sql/hive/execution/HiveTableScanExec.scala | 6 +--
.../sql/hive/MetastoreDataSourcesSuite.scala | 1 +
.../apache/spark/sql/hive/StatisticsSuite.scala | 10 ++---
.../sql/hive/execution/SQLQuerySuite.scala | 10 ++---
.../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +-
.../apache/spark/sql/hive/parquetSuites.scala | 4 +-
17 files changed, 86 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8d9fb4c..df8f9aa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -679,12 +679,7 @@ class SessionCatalog(
child = parser.parsePlan(viewText))
SubqueryAlias(table, child)
} else {
- val tableRelation = CatalogRelation(
- metadata,
- // we assume all the columns are nullable.
- metadata.dataSchema.asNullable.toAttributes,
- metadata.partitionSchema.asNullable.toAttributes)
- SubqueryAlias(table, tableRelation)
+ SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
}
} else {
SubqueryAlias(table, tempTables(table))
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 976d787..5c8e570 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -397,11 +397,22 @@ object CatalogTypes {
type TablePartitionSpec = Map[String, String]
}
+/**
+ * A placeholder for a table relation, which will be replaced by concrete relation like
+ * `LogicalRelation` or `HiveTableRelation`, during analysis.
+ */
+case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
+ assert(tableMeta.identifier.database.isDefined)
+ override lazy val resolved: Boolean = false
+ override def output: Seq[Attribute] = Nil
+}
/**
- * A [[LogicalPlan]] that represents a table.
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
*/
-case class CatalogRelation(
+case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -415,7 +426,7 @@ case class CatalogRelation(
def isPartitioned: Boolean = partitionCols.nonEmpty
override def equals(relation: Any): Boolean = relation match {
- case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+ case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
case _ => false
}
@@ -434,15 +445,12 @@ case class CatalogRelation(
))
override def computeStats(conf: SQLConf): Statistics = {
- // For data source tables, we will create a `LogicalRelation` and won't call this method, for
- // hive serde tables, we will always generate a statistics.
- // TODO: unify the table stats generation.
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
throw new IllegalStateException("table stats must be specified.")
}
}
- override def newInstance(): LogicalPlan = copy(
+ override def newInstance(): HiveTableRelation = copy(
dataCols = dataCols.map(_.newInstance()),
partitionCols = partitionCols.map(_.newInstance()))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 5ee729e..9c1b638 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -506,14 +506,14 @@ abstract class SessionCatalogSuite extends PlanTest {
catalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
- .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+ .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
// Otherwise, we'll first look up a temporary table with the same name
assert(catalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1))
// Then, if that does not exist, look up the relation in the current database
catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
- .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+ .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b71c5eb..0259fff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
- relation.tableMeta.identifier
+ case relation: HiveTableRelation => relation.tableMeta.identifier
}
val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
- case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
- && srcRelations.contains(relation.tableMeta.identifier) =>
+ case relation: HiveTableRelation
+ if srcRelations.contains(relation.tableMeta.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 00a5edf..a775fb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -2777,7 +2777,7 @@ class Dataset[T] private[sql](
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
- case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
+ case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
}.flatten
files.toSet.toArray
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 3c046ce..d59b3c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -101,7 +101,7 @@ case class OptimizeMetadataOnlyQuery(
val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
- case relation: CatalogRelation =>
+ case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
val caseInsensitiveProperties =
CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -137,7 +137,7 @@ case class OptimizeMetadataOnlyQuery(
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)
- case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+ case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Some(AttributeSet(partAttrs), relation)
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ded9303..04ee081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
/**
- * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
+ *
+ * TODO: we should remove the special handling for hive tables after completely making hive as a
+ * data source.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
- private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
- val table = r.tableMeta
+ private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
- val catalogProxy = sparkSession.sessionState.catalog
-
- val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
+ val catalog = sparkSession.sessionState.catalog
+ catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
- }).asInstanceOf[LogicalRelation]
+ })
+ }
- if (r.output.isEmpty) {
- // It's possible that the table schema is empty and need to be inferred at runtime. For this
- // case, we don't need to change the output of the cached plan.
- plan
- } else {
- plan.copy(output = r.output)
- }
+ private def readHiveTable(table: CatalogTable): LogicalPlan = {
+ HiveTableRelation(
+ table,
+ // Hive table columns are always nullable.
+ table.dataSchema.asNullable.toAttributes,
+ table.partitionSchema.asNullable.toAttributes)
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
- if DDLUtils.isDatasourceTable(r.tableMeta) =>
- i.copy(table = readDataSourceTable(r))
+ case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
+ if DDLUtils.isDatasourceTable(tableMeta) =>
+ i.copy(table = readDataSourceTable(tableMeta))
+
+ case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
+ i.copy(table = readHiveTable(tableMeta))
+
+ case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
+ readDataSourceTable(tableMeta)
- case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
- readDataSourceTable(r)
+ case UnresolvedCatalogRelation(tableMeta) =>
+ readHiveTable(tableMeta)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 45f2a41..9647f2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -376,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
- case relation: CatalogRelation =>
+ case relation: HiveTableRelation =>
val metadata = relation.tableMeta
preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ae0f219..86d19af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -304,7 +304,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
// Analyze only one column.
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
- case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+ case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9b3cbb6..e1fee9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
def convertToLogicalRelation(
- relation: CatalogRelation,
+ relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
fileType: String): LogicalRelation = {
@@ -212,7 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
}
- // The inferred schema may have different filed names as the table schema, we should respect
+ // The inferred schema may have different field names as the table schema, we should respect
// it, but also respect the exprId in table relation output.
assert(result.output.length == relation.output.length &&
result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -223,7 +223,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
private def inferIfNeeded(
- relation: CatalogRelation,
+ relation: HiveTableRelation,
options: Map[String, String],
fileFormat: FileFormat,
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4f090d5..53e500e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -116,7 +116,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case relation: CatalogRelation
+ case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
// TODO: check if this estimate is valid for tables after partition pruning.
@@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
+ case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
@@ -184,13 +184,13 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
- private def isConvertible(relation: CatalogRelation): Boolean = {
+ private def isConvertible(relation: HiveTableRelation): Boolean = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}
- private def convert(relation: CatalogRelation): LogicalRelation = {
+ private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (serde.contains("parquet")) {
val options = Map(ParquetOptions.MERGE_SCHEMA ->
@@ -207,14 +207,14 @@ case class RelationConversions(
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
- case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
+ case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
// Read path
- case relation: CatalogRelation
+ case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
}
@@ -242,7 +242,7 @@ private[hive] trait HiveStrategies {
*/
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
+ case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = AttributeSet(relation.partitionCols)
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index e191071..75b076b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution._
@@ -50,7 +50,7 @@ import org.apache.spark.util.Utils
private[hive]
case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
- relation: CatalogRelation,
+ relation: HiveTableRelation,
partitionPruningPred: Seq[Expression])(
@transient private val sparkSession: SparkSession)
extends LeafExecNode {
@@ -205,7 +205,7 @@ case class HiveTableScanExec(
val input: AttributeSeq = relation.output
HiveTableScanExec(
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
- relation.canonicalized.asInstanceOf[CatalogRelation],
+ relation.canonicalized.asInstanceOf[HiveTableRelation],
QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b554694..06a30b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1377,6 +1377,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
hiveClient.createTable(tableDesc, ignoreIfExists = false)
checkAnswer(spark.table("old"), Row(1, "a"))
+ checkAnswer(sql("select * from old"), Row(1, "a"))
val expectedSchema = StructType(Seq(
StructField("i", IntegerType, nullable = true),
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 819180d..a9caad8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.joins._
@@ -60,7 +60,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|LOCATION '${tempDir.toURI}'""".stripMargin)
val relation = spark.table("csv_table").queryExecution.analyzed.children.head
- .asInstanceOf[CatalogRelation]
+ .asInstanceOf[HiveTableRelation]
val properties = relation.tableMeta.properties
assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
@@ -497,7 +497,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("estimates the size of a test Hive serde tables") {
val df = sql("""SELECT * FROM src""")
val sizes = df.queryExecution.analyzed.collect {
- case relation: CatalogRelation => relation.stats(conf).sizeInBytes
+ case relation: HiveTableRelation => relation.stats(conf).sizeInBytes
}
assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizes(0).equals(BigInt(5812)),
@@ -557,7 +557,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
() => (),
metastoreQuery,
metastoreAnswer,
- implicitly[ClassTag[CatalogRelation]]
+ implicitly[ClassTag[HiveTableRelation]]
)
}
@@ -571,7 +571,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Assert src has a size smaller than the threshold.
val sizes = df.queryExecution.analyzed.collect {
- case relation: CatalogRelation => relation.stats(conf).sizeInBytes
+ case relation: HiveTableRelation => relation.stats(conf).sizeInBytes
}
assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
&& sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 002ddd4..dc79bfa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
case LogicalRelation(r: HadoopFsRelation, _, _) =>
if (!isDataSourceTable) {
fail(
- s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " +
+ s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
}
userSpecifiedLocation match {
@@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
assert(catalogTable.provider.get === format)
- case r: CatalogRelation =>
+ case r: HiveTableRelation =>
if (isDataSourceTable) {
fail(
s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
- s"${classOf[CatalogRelation].getCanonicalName}.")
+ s"${classOf[HiveTableRelation].getCanonicalName}.")
}
userSpecifiedLocation match {
case Some(location) =>
@@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
sql("CREATE TABLE explodeTest (key bigInt)")
table("explodeTest").queryExecution.analyzed match {
- case SubqueryAlias(_, r: CatalogRelation) => // OK
+ case SubqueryAlias(_, r: HiveTableRelation) => // OK
case _ =>
fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8c85573..60ccd99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive._
@@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
} else {
queryExecution.analyzed.collectFirst {
- case _: CatalogRelation => ()
+ case _: HiveTableRelation => ()
}.getOrElse {
fail(s"Expecting no conversion from orc to data sources, " +
s"but got:\n$queryExecution")
http://git-wip-us.apache.org/repos/asf/spark/blob/2a969759/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 23f21e6..303884d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
@@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
}
} else {
queryExecution.analyzed.collectFirst {
- case _: CatalogRelation =>
+ case _: HiveTableRelation =>
}.getOrElse {
fail(s"Expecting no conversion from parquet to data sources, " +
s"but got:\n$queryExecution")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org