You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/10/27 21:22:36 UTC
[2/2] spark git commit: [SPARK-17970][SQL] store partition spec in
metastore for data source table
[SPARK-17970][SQL] store partition spec in metastore for data source table
## What changes were proposed in this pull request?
We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:
1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION`
2. We don't need to cache all file status for data source table anymore.
## How was this patch tested?
existing tests.
Author: Eric Liang <ek...@databricks.com>
Author: Michael Allman <mi...@videoamp.com>
Author: Eric Liang <ek...@gmail.com>
Author: Wenchen Fan <we...@databricks.com>
Closes #15515 from cloud-fan/partition.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccb11543
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccb11543
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccb11543
Branch: refs/heads/master
Commit: ccb11543048dccd4cc590a8db1df1d9d5847d112
Parents: 79fd0cc
Author: Eric Liang <ek...@databricks.com>
Authored: Thu Oct 27 14:22:30 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Oct 27 14:22:30 2016 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/catalog/interface.scala | 12 +-
.../sql/catalyst/trees/TreeNodeSuite.scala | 1 +
.../org/apache/spark/sql/DataFrameWriter.scala | 13 +-
.../command/AnalyzeColumnCommand.scala | 3 +-
.../execution/command/AnalyzeTableCommand.scala | 3 +-
.../command/createDataSourceTables.scala | 17 +-
.../spark/sql/execution/command/ddl.scala | 90 +++---
.../spark/sql/execution/command/tables.scala | 39 +--
.../sql/execution/datasources/DataSource.scala | 20 +-
.../datasources/DataSourceStrategy.scala | 15 +-
.../sql/execution/datasources/FileCatalog.scala | 4 +
.../execution/datasources/FileStatusCache.scala | 2 +-
.../PartitioningAwareFileCatalog.scala | 12 +-
.../datasources/TableFileCatalog.scala | 4 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 16 +-
.../apache/spark/sql/SQLQueryTestSuite.scala | 2 +-
.../spark/sql/execution/command/DDLSuite.scala | 200 +++++-------
.../spark/sql/hive/HiveExternalCatalog.scala | 129 +++++---
.../spark/sql/hive/HiveMetastoreCatalog.scala | 9 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 5 +-
.../spark/sql/hive/HiveMetadataCacheSuite.scala | 2 +-
.../sql/hive/HiveTablePerfStatsSuite.scala | 240 ---------------
.../PartitionProviderCompatibilitySuite.scala | 137 +++++++++
.../hive/PartitionedTablePerfStatsSuite.scala | 304 +++++++++++++++++++
.../apache/spark/sql/hive/StatisticsSuite.scala | 65 ++--
.../sql/hive/execution/HiveCommandSuite.scala | 5 +-
.../sql/hive/execution/SQLQuerySuite.scala | 8 +-
27 files changed, 812 insertions(+), 545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a97ed70..7c3bec8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -89,9 +89,10 @@ case class CatalogTablePartition(
parameters: Map[String, String] = Map.empty) {
override def toString: String = {
+ val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
val output =
Seq(
- s"Partition Values: [${spec.values.mkString(", ")}]",
+ s"Partition Values: [$specString]",
s"$storage",
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
@@ -137,6 +138,8 @@ case class BucketSpec(
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
+ * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
+ * metastore.
*/
case class CatalogTable(
identifier: TableIdentifier,
@@ -154,7 +157,8 @@ case class CatalogTable(
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
- unsupportedFeatures: Seq[String] = Seq.empty) {
+ unsupportedFeatures: Seq[String] = Seq.empty,
+ partitionProviderIsHive: Boolean = false) {
/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
@@ -212,11 +216,11 @@ case class CatalogTable(
comment.map("Comment: " + _).getOrElse(""),
if (properties.nonEmpty) s"Properties: $tableProperties" else "",
if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
- s"$storage")
+ s"$storage",
+ if (partitionProviderIsHive) "Partition Provider: Hive" else "")
output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
}
-
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb0426c..3eff12f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
"owner" -> "",
"createTime" -> 0,
"lastAccessTime" -> -1,
+ "partitionProviderIsHive" -> false,
"properties" -> JNull,
"unsupportedFeatures" -> List.empty[String]))
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4b5f024..7ff3522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
@@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec
)
- val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+ val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+ val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
+ df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
+ // Need to recover partitions into the metastore so our saved data is visible.
+ val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
+ Union(createCmd, recoverPartitionCmd)
+ } else {
+ createCmd
+ }
df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 4881387..f873f34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+ updateStats(logicalRel.catalogTable.get,
+ AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 7b0e49b..52a8fc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -51,7 +51,8 @@ case class AnalyzeTableCommand(
// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
- updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+ updateTableStats(logicalRel.catalogTable.get,
+ AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a8c75a7..2a97431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
- partitionColumnNames = partitionColumnNames)
+ partitionColumnNames = partitionColumnNames,
+ // If metastore partition management for file source tables is enabled, we start off with
+ // partition provider hive, but no partitions in the metastore. The user has to call
+ // `msck repair table` to populate the table partitions.
+ partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+ sparkSession.sessionState.conf.manageFilesourcePartitions)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+
Seq.empty[Row]
}
}
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
+
// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15656fa..61e0550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
spec,
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
- }
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
oldPartition,
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
}
}
+ private def getBasePath(table: CatalogTable): Option[String] = {
+ if (table.provider == Some("hive")) {
+ table.storage.locationUri
+ } else {
+ new CaseInsensitiveMap(table.storage.properties).get("path")
+ }
+ }
+
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
- }
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
- if (table.storage.locationUri.isEmpty) {
+
+ val tablePath = getBasePath(table)
+ if (tablePath.isEmpty) {
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}
- val root = new Path(table.storage.locationUri.get)
+ val root = new Path(tablePath.get)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val hadoopConf = spark.sparkContext.hadoopConfiguration
val pathFilter = getPathFilter(hadoopConf)
- val partitionSpecsAndLocs = scanPartitions(
- spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+ val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
+ table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")
@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
logInfo(s"Finished to gather the fast stats for all $total partitions.")
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+ // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
+ // This is always the case for Hive format tables, but is not true for Datasource tables created
+ // before Spark 2.1 unless they are converted via `msck repair table`.
+ spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+ catalog.refreshTable(tableName)
logInfo(s"Recovered all partitions ($total).")
Seq.empty[Row]
}
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String],
- threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+ threshold: Int,
+ resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
- val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+ val columnName = PartitioningUtils.unescapePathName(ps(0))
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
- // comparing with case-insensitive, but preserve the case
- if (columnName == partitionNames.head) {
- scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
- partitionNames.drop(1), threshold)
+ if (resolver(columnName, partitionNames.head)) {
+ scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
+ partitionNames.drop(1), threshold, resolver)
} else {
- logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
+ logWarning(
+ s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Seq()
}
} else {
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
+ DDLUtils.verifyPartitionProviderIsHive(
+ sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
- val newPart =
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
- "using the datasource API")
- } else {
- part.copy(storage = part.storage.copy(locationUri = Some(location)))
- }
+ val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
@@ -710,6 +709,25 @@ object DDLUtils {
}
/**
+ * Throws a standard error for actions that require partitionProvider = hive.
+ */
+ def verifyPartitionProviderIsHive(
+ spark: SparkSession, table: CatalogTable, action: String): Unit = {
+ val tableName = table.identifier.table
+ if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
+ throw new AnalysisException(
+ s"$action is not allowed on $tableName since filesource partition management is " +
+ "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+ }
+ if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+ throw new AnalysisException(
+ s"$action is not allowed on $tableName since its partition metadata is not stored in " +
+ "the Hive metastore. To import this information into the metastore, run " +
+ s"`msck repair table $tableName`")
+ }
+ }
+
+ /**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
*
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index aec2543..4acfffb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -358,19 +358,16 @@ case class TruncateTableCommand(
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
}
- val isDatasourceTable = DDLUtils.isDatasourceTable(table)
- if (isDatasourceTable && partitionSpec.isDefined) {
- throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: $tableIdentwithDB")
- }
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentwithDB")
}
+ if (partitionSpec.isDefined) {
+ DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
+ }
val locations =
- if (isDatasourceTable) {
+ if (DDLUtils.isDatasourceTable(table)) {
Seq(table.storage.properties.get("path"))
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
@@ -453,7 +450,7 @@ case class DescribeTableCommand(
describeFormattedTableInfo(metadata, result)
}
} else {
- describeDetailedPartitionInfo(catalog, metadata, result)
+ describeDetailedPartitionInfo(sparkSession, catalog, metadata, result)
}
}
@@ -492,6 +489,10 @@ case class DescribeTableCommand(
describeStorageInfo(table, buffer)
if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
+
+ if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+ append(buffer, "Partition Provider:", "Hive", "")
+ }
}
private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
@@ -528,6 +529,7 @@ case class DescribeTableCommand(
}
private def describeDetailedPartitionInfo(
+ spark: SparkSession,
catalog: SessionCatalog,
metadata: CatalogTable,
result: ArrayBuffer[Row]): Unit = {
@@ -535,10 +537,7 @@ case class DescribeTableCommand(
throw new AnalysisException(
s"DESC PARTITION is not allowed on a view: ${table.identifier}")
}
- if (DDLUtils.isDatasourceTable(metadata)) {
- throw new AnalysisException(
- s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
- }
+ DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
val partition = catalog.getPartition(table, partitionSpec)
if (isExtended) {
describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
@@ -743,10 +742,7 @@ case class ShowPartitionsCommand(
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
}
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
- }
+ DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
/**
* Validate the partitioning spec by making sure all the referenced columns are
@@ -894,18 +890,11 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.properties.nonEmpty) {
- val filteredProps = metadata.properties.filterNot {
- // Skips "EXTERNAL" property for external tables
- case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
- }
-
- val props = filteredProps.map { case (key, value) =>
+ val props = metadata.properties.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}
- if (props.nonEmpty) {
- builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
- }
+ builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 17da606..5b8f05a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -65,6 +65,8 @@ import org.apache.spark.util.Utils
* @param partitionColumns A list of column names that the relation is partitioned by. When this
* list is empty, the relation is unpartitioned.
* @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ * @param catalogTable Optional catalog table reference that can be used to push down operations
+ * over the datasource to the catalog service.
*/
case class DataSource(
sparkSession: SparkSession,
@@ -73,7 +75,8 @@ case class DataSource(
userSpecifiedSchema: Option[StructType] = None,
partitionColumns: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
- options: Map[String, String] = Map.empty) extends Logging {
+ options: Map[String, String] = Map.empty,
+ catalogTable: Option[CatalogTable] = None) extends Logging {
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
@@ -412,9 +415,16 @@ case class DataSource(
})
}
- val fileCatalog =
+ val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+ catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+ new TableFileCatalog(
+ sparkSession,
+ catalogTable.get,
+ catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
+ } else {
new ListingFileCatalog(
sparkSession, globbedPaths, options, partitionSchema)
+ }
val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
@@ -423,7 +433,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- fileCatalog.allFiles())
+ fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
@@ -432,7 +442,7 @@ case class DataSource(
HadoopFsRelation(
fileCatalog,
- partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+ partitionSchema = fileCatalog.partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7d0abe8..f0bcf94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils, ExecutedCommandExec}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -179,7 +179,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}
- InsertIntoHadoopFsRelationCommand(
+ val insertCmd = InsertIntoHadoopFsRelationCommand(
outputPath,
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
@@ -188,6 +188,15 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
t.options,
query,
mode)
+
+ if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
+ l.catalogTable.get.partitionProviderIsHive) {
+ // TODO(ekl) we should be more efficient here and only recover the newly added partitions
+ val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(l.catalogTable.get.identifier)
+ Union(insertCmd, recoverPartitionCmd)
+ } else {
+ insertCmd
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
index 2bc66ce..dba6462 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
/**
* A collection of data files from a partitioned relation, along with the partition values in the
@@ -63,4 +64,7 @@ trait FileCatalog {
/** Sum of table file sizes, in bytes */
def sizeInBytes: Long
+
+ /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+ def partitionSchema: StructType
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index e0ec748..7c2e6fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -64,7 +64,7 @@ object FileStatusCache {
*/
def newCache(session: SparkSession): FileStatusCache = {
synchronized {
- if (session.sqlContext.conf.filesourcePartitionPruning &&
+ if (session.sqlContext.conf.manageFilesourcePartitions &&
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9b1903c..cc4049e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -38,19 +38,21 @@ import org.apache.spark.util.SerializableConfiguration
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- * discovered partitions
-*/
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ * the discovered partitions
+ */
abstract class PartitioningAwareFileCatalog(
sparkSession: SparkSession,
parameters: Map[String, String],
- partitionSchema: Option[StructType],
+ userPartitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
import PartitioningAwareFileCatalog.BASE_PATH_PARAM
/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec
+ override def partitionSchema: StructType = partitionSpec().partitionColumns
+
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
@@ -122,7 +124,7 @@ abstract class PartitioningAwareFileCatalog(
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq
- partitionSchema match {
+ userPartitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 667379b..b459df5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
/**
@@ -45,6 +46,8 @@ class TableFileCatalog(
private val baseLocation = table.storage.locationUri
+ override def partitionSchema: StructType = table.partitionSchema
+
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -63,7 +66,6 @@ class TableFileCatalog(
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
- val partitionSchema = table.partitionSchema
val partitions = selectedPartitions.map { p =>
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f47ec7f..dc31f3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -272,18 +272,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val HIVE_FILESOURCE_PARTITION_PRUNING =
- SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
- .doc("When true, enable metastore partition pruning for filesource relations as well. " +
- "This is currently implemented for converted Hive tables only.")
+ val HIVE_MANAGE_FILESOURCE_PARTITIONS =
+ SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
+ .doc("When true, enable metastore partition management for file source tables as well. " +
+ "This includes both datasource and converted Hive tables. When partition managment " +
+ "is enabled, datasource tables store partition in the Hive metastore, and use the " +
+ "metastore to prune partitions during query planning.")
.booleanConf
.createWithDefault(true)
val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
- .doc("When nonzero, enable caching of partition file metadata in memory. All table share " +
+ .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
"a cache that can use up to specified num bytes for file metadata. This conf only " +
- "applies if filesource partition pruning is also enabled.")
+ "has an effect when hive filesource partition management is enabled.")
.longConf
.createWithDefault(250 * 1024 * 1024)
@@ -679,7 +681,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
- def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+ def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 6857dd3..2d73d9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -197,7 +197,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
assertResult(expected.schema, s"Schema did not match for query #$i\n${expected.sql}") {
output.schema
}
- assertResult(expected.output, s"Result dit not match for query #$i\n${expected.sql}") {
+ assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") {
output.output
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b989d01..9fb0f53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,7 +95,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("b", "int"),
provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
- createTime = 0L)
+ createTime = 0L,
+ partitionProviderIsHive = true)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
@@ -923,68 +924,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("alter table: rename partition") {
- val catalog = spark.sessionState.catalog
- val tableIdent = TableIdentifier("tab1", Some("dbx"))
- createPartitionedTable(tableIdent, isDatasourceTable = false)
-
- // basic rename partition
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
- sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
- // rename without explicitly specifying database
- catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
- // table to alter does not exist
- intercept[NoSuchTableException] {
- sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
- }
-
- // partition to rename does not exist
- intercept[NoSuchPartitionException] {
- sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
- }
-
- // partition spec in RENAME PARTITION should be case insensitive by default
- sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+ testRenamePartitions(isDatasourceTable = false)
}
test("alter table: rename partition (datasource table)") {
- createPartitionedTable(TableIdentifier("tab1", Some("dbx")), isDatasourceTable = true)
- val e = intercept[AnalysisException] {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
- }.getMessage
- assert(e.contains(
- "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API"))
- // table to alter does not exist
- intercept[NoSuchTableException] {
- sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
- }
- }
-
- private def createPartitionedTable(
- tableIdent: TableIdentifier,
- isDatasourceTable: Boolean): Unit = {
- val catalog = spark.sessionState.catalog
- val part1 = Map("a" -> "1", "b" -> "q")
- val part2 = Map("a" -> "2", "b" -> "c")
- val part3 = Map("a" -> "3", "b" -> "p")
- createDatabase(catalog, "dbx")
- createTable(catalog, tableIdent)
- createTablePartition(catalog, part1, tableIdent)
- createTablePartition(catalog, part2, tableIdent)
- createTablePartition(catalog, part3, tableIdent)
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3))
- if (isDatasourceTable) {
- convertToDatasourceTable(catalog, tableIdent)
- }
+ testRenamePartitions(isDatasourceTable = true)
}
test("show tables") {
@@ -1199,7 +1143,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
if (spec.isDefined) {
assert(storageFormat.properties.isEmpty)
- assert(storageFormat.locationUri.isEmpty)
+ assert(storageFormat.locationUri === Some(expected))
} else {
assert(storageFormat.properties.get("path") === Some(expected))
assert(storageFormat.locationUri === Some(expected))
@@ -1212,18 +1156,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
verifyLocation("/path/to/your/lovely/heart")
// set table partition location
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
- }
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
verifyLocation("/path/to/part/ways", Some(partSpec))
// set table location without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
verifyLocation("/swanky/steak/place")
// set table partition location without explicitly specifying database
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
- }
+ sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
verifyLocation("vienna", Some(partSpec))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1354,26 +1294,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// basic add partition
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
- "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
- assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
- assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
- }
+ sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+ "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
+ assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
// add partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4))
- }
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1386,22 +1318,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// partition to add already exists when using IF NOT EXISTS
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4))
- }
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
// partition spec in ADD PARTITION should be case insensitive by default
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(part1, part2, part3, part4, part5))
- }
+ sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4, part5))
}
private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1424,21 +1348,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// basic drop partition
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
- }
+ sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
- }
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// table to alter does not exist
intercept[AnalysisException] {
@@ -1451,20 +1367,56 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// partition to drop does not exist when using IF EXISTS
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
- }
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
- }
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
// partition spec in DROP PARTITION should be case insensitive by default
- maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+ sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+ assert(catalog.listPartitions(tableIdent).isEmpty)
+ }
+
+ private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1", "b" -> "q")
+ val part2 = Map("a" -> "2", "b" -> "c")
+ val part3 = Map("a" -> "3", "b" -> "p")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ createTablePartition(catalog, part1, tableIdent)
+ createTablePartition(catalog, part2, tableIdent)
+ createTablePartition(catalog, part3, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+
+ // basic rename partition
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+ // rename without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+ // table to alter does not exist
+ intercept[NoSuchTableException] {
+ sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
- if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).isEmpty)
+
+ // partition to rename does not exist
+ intercept[NoSuchPartitionException] {
+ sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
}
+
+ // partition spec in RENAME PARTITION should be case insensitive by default
+ sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
}
test("drop build-in function") {
@@ -1683,12 +1635,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- // truncating partitioned data source tables is not supported
withTable("rectangles", "rectangles2") {
data.write.saveAsTable("rectangles")
data.write.partitionBy("length").saveAsTable("rectangles2")
+
+ // not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
- assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+
+ // supported since partitions are stored in the metastore
+ sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+ assert(spark.table("rectangles2").collect().isEmpty)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2003ff4..409c316 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* metastore.
*/
private def verifyTableProperties(table: CatalogTable): Unit = {
- val invalidKeys = table.properties.keys.filter { key =>
- key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
- }
+ val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
if (invalidKeys.nonEmpty) {
throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
- s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
- s" ${invalidKeys.mkString("[", ", ", "]")}")
+ s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
+ invalidKeys.mkString("[", ", ", "]"))
}
// External users are not allowed to set/switch the table type. In Hive metastore, the table
// type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
@@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
throw new TableAlreadyExistsException(db = db, table = table)
}
// Before saving data source table metadata into Hive metastore, we should:
- // 1. Put table schema, partition column names and bucket specification in table properties.
+ // 1. Put table provider, schema, partition column names, bucket specification and partition
+ // provider in table properties.
// 2. Check if this table is hive compatible
// 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
// and save table metadata to Hive.
- // 2.1 If it's hive compatible, set serde information in table metadata and try to save
+ // 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
if (DDLUtils.isDatasourceTable(tableDefinition)) {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
@@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)
+ if (tableDefinition.partitionProviderIsHive) {
+ tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
+ }
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
@@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
- // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
- // names and bucket specification to empty.
+ // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+ // bucket specification to empty. Note that partition columns are retained, so that we can
+ // call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
tableDefinition.copy(
- schema = new StructType,
- partitionColumnNames = Nil,
+ schema = tableDefinition.partitionSchema,
bucketSpec = None,
properties = tableDefinition.properties ++ tableProperties)
}
@@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
// to retain the spark specific format if it is. Also add old data source properties to table
// properties, to retain the data source table format.
- val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+ val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+ val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
+ TABLE_PARTITION_PROVIDER -> "hive"
+ } else {
+ TABLE_PARTITION_PROVIDER -> "builtin"
+ }
val newDef = withStatsProps.copy(
schema = oldDef.schema,
partitionColumnNames = oldDef.partitionColumnNames,
bucketSpec = oldDef.bucketSpec,
- properties = oldDataSourceProps ++ withStatsProps.properties)
+ properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
client.alterTable(newDef)
} else {
@@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* properties, and filter out these special entries from table properties.
*/
private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
- val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) {
+ if (conf.get(DEBUG_MODE)) {
+ return table
+ }
+
+ val tableWithSchema = if (table.tableType == VIEW) {
table
} else {
getProviderFromTableProperties(table).map { provider =>
@@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(provider),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table),
- properties = getOriginalTableProperties(table))
+ partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
} getOrElse {
- table.copy(provider = Some("hive"))
+ table.copy(provider = Some("hive"), partitionProviderIsHive = true)
}
}
+
// construct Spark's statistics from information in Hive metastore
- val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
- if (statsProps.nonEmpty) {
+ val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+ val tableWithStats = if (statsProps.nonEmpty) {
val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
.map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
- val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+ val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect {
case f if colStatsProps.contains(f.name) =>
val numFields = ColumnStatStruct.numStatFields(f.dataType)
(f.name, ColumnStat(numFields, colStatsProps(f.name)))
}.toMap
- catalogTable.copy(
- properties = removeStatsProperties(catalogTable),
+ tableWithSchema.copy(
stats = Some(Statistics(
- sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
- rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+ sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)),
+ rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
colStats = colStats)))
} else {
- catalogTable
+ tableWithSchema
}
+
+ tableWithStats.copy(properties = getOriginalTableProperties(table))
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Partitions
// --------------------------------------------------------------------------
+ // Hive metastore is not case preserving and the partition columns are always lower cased. We need
+ // to lower case the column names in partition specification before calling partition related Hive
+ // APIs, to match this behaviour.
+ private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+ spec.map { case (k, v) => k.toLowerCase -> v }
+ }
+
+ // Hive metastore is not case preserving and the column names of the partition specification we
+ // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
+ // partition columns.
+ private def restorePartitionSpec(
+ spec: TablePartitionSpec,
+ partCols: Seq[String]): TablePartitionSpec = {
+ spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
+ }
+
override def createPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.createPartitions(db, table, parts, ignoreIfExists)
+ val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
}
override def dropPartitions(
@@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
+ client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
}
override def renamePartitions(
@@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
- client.renamePartitions(db, table, specs, newSpecs)
+ client.renamePartitions(
+ db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
}
override def alterPartitions(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
- client.alterPartitions(db, table, newParts)
+ val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+ client.alterPartitions(db, table, lowerCasedParts)
}
override def getPartition(
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
- client.getPartition(db, table, spec)
+ val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
}
/**
@@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
- client.getPartitionOption(db, table, spec)
+ client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
/**
@@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
- client.getPartitions(db, table, partialSpec)
+ client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+ }
}
override def listPartitionsByFilter(
db: String,
table: String,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
- val catalogTable = client.getTable(db, table)
+ val rawTable = client.getTable(db, table)
+ val catalogTable = restoreTableMetadata(rawTable)
val partitionColumnNames = catalogTable.partitionColumnNames.toSet
val nonPartitionPruningPredicates = predicates.filterNot {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partitionSchema = catalogTable.partitionSchema
if (predicates.nonEmpty) {
- val clientPrunedPartitions =
- client.getPartitionsByFilter(catalogTable, predicates)
+ val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
val boundPredicate =
InterpretedPredicate.create(predicates.reduce(And).transform {
case att: AttributeReference =>
val index = partitionSchema.indexWhere(_.name == att.name)
BoundReference(index, partitionSchema(index).dataType, nullable = true)
})
- clientPrunedPartitions.filter { case p: CatalogTablePartition =>
- boundPredicate(p.toRow(partitionSchema))
- }
+ clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
} else {
- client.getPartitions(catalogTable)
+ client.getPartitions(catalogTable).map { part =>
+ part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+ }
}
}
@@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
object HiveExternalCatalog {
- val DATASOURCE_PREFIX = "spark.sql.sources."
+ val SPARK_SQL_PREFIX = "spark.sql."
+
+ val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
@@ -736,21 +778,20 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
- val STATISTICS_PREFIX = "spark.sql.statistics."
+ val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
- def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
- }
+ val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+
def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
metadata.properties.get(DATASOURCE_PROVIDER)
}
def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
- metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+ metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }
}
// A persisted data source table always store its schema in the catalog.
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c1585d..d1de863 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
- options = table.storage.properties)
+ options = table.storage.properties,
+ catalogTable = Some(table))
- LogicalRelation(
- dataSource.resolveRelation(),
- catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
}
}
@@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
- val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
+ val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8835b26..84873bb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -777,7 +777,7 @@ private[hive] class HiveClientImpl(
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
table.partitionColumnNames.contains(c.getName)
}
- if (table.schema.isEmpty) {
+ if (schema.isEmpty) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
@@ -831,9 +831,6 @@ private[hive] class HiveClientImpl(
new HivePartition(ht, tpart)
}
- // TODO (cloud-fan): the column names in partition specification are always lower cased because
- // Hive metastore is not case preserving. We should normalize them to the actual column names of
- // the table, once we store partition spec of data source tables.
private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
val apiPartition = hp.getTPartition
CatalogTablePartition(
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index d290fe9..6e887d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -63,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
def testCaching(pruningEnabled: Boolean): Unit = {
test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> pruningEnabled.toString) {
withTable("test") {
withTempDir { dir =>
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
deleted file mode 100644
index 82ee813..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.execution.datasources.FileStatusCache
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-class HiveTablePerfStatsSuite
- extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
-
- override def beforeEach(): Unit = {
- super.beforeEach()
- FileStatusCache.resetForTesting()
- }
-
- override def afterEach(): Unit = {
- super.afterEach()
- FileStatusCache.resetForTesting()
- }
-
- private def setupPartitionedTable(tableName: String, dir: File): Unit = {
- spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
- .partitionBy("partCol1", "partCol2")
- .mode("overwrite")
- .parquet(dir.getAbsolutePath)
-
- spark.sql(s"""
- |create external table $tableName (id long)
- |partitioned by (partCol1 int, partCol2 int)
- |stored as parquet
- |location "${dir.getAbsolutePath}"""".stripMargin)
- spark.sql(s"msck repair table $tableName")
- }
-
- test("partitioned pruned table reports only selected files") {
- assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- val df = spark.sql("select * from test")
- assert(df.count() == 5)
- assert(df.inputFiles.length == 5) // unpruned
-
- val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4")
- assert(df2.count() == 2)
- assert(df2.inputFiles.length == 2) // pruned, so we have less files
-
- val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4")
- assert(df3.count() == 2)
- assert(df3.inputFiles.length == 2)
-
- val df4 = spark.sql("select * from test where partCol1 = 999")
- assert(df4.count() == 0)
- assert(df4.inputFiles.length == 0)
- }
- }
- }
-
- test("lazy partition pruning reads only necessary partition data") {
- withSQLConf(
- SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
- SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 = 999").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 2").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-
- HiveCatalogMetrics.reset()
- spark.sql("select * from test where partCol1 < 3").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
-
- // should read all
- HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
- // read all should not be cached
- HiveCatalogMetrics.reset()
- spark.sql("select * from test").count()
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
- // cache should be disabled
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
- }
- }
- }
- }
-
- test("lazy partition pruning with file status caching enabled") {
- withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test where partCol1 < 3").count() == 3)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
- }
- }
- }
- }
-
- test("file status caching respects refresh table and refreshByPath") {
- withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- spark.sql("refresh table test")
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
- spark.catalog.cacheTable("test")
- HiveCatalogMetrics.reset()
- spark.catalog.refreshByPath(dir.getAbsolutePath)
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
- }
- }
- }
- }
-
- test("file status cache respects size limit") {
- withSQLConf(
- "spark.sql.hive.filesourcePartitionPruning" -> "true",
- "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
- assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
- }
- }
- }
- }
-
- test("all partitions read and cached when filesource partition pruning is off") {
- withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
- withTable("test") {
- withTempDir { dir =>
- setupPartitionedTable("test", dir)
-
- // We actually query the partitions from hive each time the table is resolved in this
- // mode. This is kind of terrible, but is needed to preserve the legacy behavior
- // of doing plan cache validation based on the entire partition set.
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
- // 5 from table resolution, another 5 from ListingFileCatalog
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
- HiveCatalogMetrics.reset()
- assert(spark.sql("select * from test").count() == 5)
- assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
- assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
- }
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org