You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/10/08 15:56:25 UTC

[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20999#discussion_r223415516
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
    @@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
      */
     case class AlterTableDropPartitionCommand(
         tableName: TableIdentifier,
    -    specs: Seq[TablePartitionSpec],
    +    partitionsFilters: Seq[Seq[Expression]],
         ifExists: Boolean,
         purge: Boolean,
         retainData: Boolean)
       extends RunnableCommand {
     
       override def run(sparkSession: SparkSession): Seq[Row] = {
         val catalog = sparkSession.sessionState.catalog
    +    val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
         val table = catalog.getTableMetadata(tableName)
    +    val partitionColumns = table.partitionColumnNames
    +    val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap
         DDLUtils.verifyAlterTableType(catalog, table, isView = false)
         DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
     
    -    val normalizedSpecs = specs.map { spec =>
    -      PartitioningUtils.normalizePartitionSpec(
    -        spec,
    -        table.partitionColumnNames,
    -        table.identifier.quotedString,
    -        sparkSession.sessionState.conf.resolver)
    +    val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
    +      if (hasComplexFilters(filtersSpec)) {
    +        generatePartitionSpec(filtersSpec,
    +          partitionColumns,
    +          partitionAttributes,
    +          table.identifier,
    +          catalog,
    +          sparkSession.sessionState.conf.resolver,
    +          timeZone,
    +          ifExists)
    +      } else {
    +        val partitionSpec = filtersSpec.map {
    +          case EqualTo(key: Attribute, Literal(value, StringType)) =>
    +            key.name -> value.toString
    +        }.toMap
    +        PartitioningUtils.normalizePartitionSpec(
    +          partitionSpec,
    +          partitionColumns,
    +          table.identifier.quotedString,
    +          sparkSession.sessionState.conf.resolver) :: Nil
    +      }
         }
     
         catalog.dropPartitions(
    -      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
    +      table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge,
           retainData = retainData)
     
         CommandUtils.updateTableStats(sparkSession, table)
     
         Seq.empty[Row]
       }
     
    +  def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
    +    partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
    +  }
    +
    +  def generatePartitionSpec(
    +      partitionFilterSpec: Seq[Expression],
    +      partitionColumns: Seq[String],
    +      partitionAttributes: Map[String, Attribute],
    +      tableIdentifier: TableIdentifier,
    +      catalog: SessionCatalog,
    +      resolver: Resolver,
    +      timeZone: Option[String],
    +      ifExists: Boolean): Seq[TablePartitionSpec] = {
    +    val filters = partitionFilterSpec.map { pFilter =>
    +      pFilter.transform {
    +        // Resolve the partition attributes
    +        case partitionCol: Attribute =>
    +          val normalizedPartition = PartitioningUtils.normalizePartitionColumn(
    +            partitionCol.name,
    +            partitionColumns,
    +            tableIdentifier.quotedString,
    +            resolver)
    +          partitionAttributes(normalizedPartition)
    +      }.transform {
    +        // Cast the partition value to the data type of the corresponding partition attribute
    +        case cmp @ BinaryComparison(partitionAttr, value)
    +            if !partitionAttr.dataType.sameType(value.dataType) =>
    +          cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone)))
    --- End diff --
    
    hmm, have you tested `Cast` cases? I look at `convertFilters` method in HiveShim.scala, and seems we don't convert `Cast` in the pushdown predicates to Hive.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org