You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/10/08 15:56:25 UTC
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/20999#discussion_r223415516
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[TablePartitionSpec],
+ partitionsFilters: Seq[Seq[Expression]],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
+ val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
val table = catalog.getTableMetadata(tableName)
+ val partitionColumns = table.partitionColumnNames
+ val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
- val normalizedSpecs = specs.map { spec =>
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- sparkSession.sessionState.conf.resolver)
+ val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+ if (hasComplexFilters(filtersSpec)) {
+ generatePartitionSpec(filtersSpec,
+ partitionColumns,
+ partitionAttributes,
+ table.identifier,
+ catalog,
+ sparkSession.sessionState.conf.resolver,
+ timeZone,
+ ifExists)
+ } else {
+ val partitionSpec = filtersSpec.map {
+ case EqualTo(key: Attribute, Literal(value, StringType)) =>
+ key.name -> value.toString
+ }.toMap
+ PartitioningUtils.normalizePartitionSpec(
+ partitionSpec,
+ partitionColumns,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver) :: Nil
+ }
}
catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
+ table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)
CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
+ def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+ partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+ }
+
+ def generatePartitionSpec(
+ partitionFilterSpec: Seq[Expression],
+ partitionColumns: Seq[String],
+ partitionAttributes: Map[String, Attribute],
+ tableIdentifier: TableIdentifier,
+ catalog: SessionCatalog,
+ resolver: Resolver,
+ timeZone: Option[String],
+ ifExists: Boolean): Seq[TablePartitionSpec] = {
+ val filters = partitionFilterSpec.map { pFilter =>
+ pFilter.transform {
+ // Resolve the partition attributes
+ case partitionCol: Attribute =>
+ val normalizedPartition = PartitioningUtils.normalizePartitionColumn(
+ partitionCol.name,
+ partitionColumns,
+ tableIdentifier.quotedString,
+ resolver)
+ partitionAttributes(normalizedPartition)
+ }.transform {
+ // Cast the partition value to the data type of the corresponding partition attribute
+ case cmp @ BinaryComparison(partitionAttr, value)
+ if !partitionAttr.dataType.sameType(value.dataType) =>
+ cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone)))
--- End diff --
hmm, have you tested `Cast` cases? I look at `convertFilters` method in HiveShim.scala, and seems we don't convert `Cast` in the pushdown predicates to Hive.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org