You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/15 23:59:09 UTC

spark git commit: [SPARK-17732][SQL] ALTER TABLE DROP PARTITION should support comparators

Repository: spark
Updated Branches:
  refs/heads/master 503378f10 -> 3ce057d00


[SPARK-17732][SQL] ALTER TABLE DROP PARTITION should support comparators

## What changes were proposed in this pull request?

This PR aims to support `comparators`, e.g. '<', '<=', '>', '>=', again in Apache Spark 2.0 for backward compatibility.

**Spark 1.6**

``` scala
scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = [result: string]

scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")
res1: org.apache.spark.sql.DataFrame = [result: string]
```

**Spark 2.0**

``` scala
scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
res0: org.apache.spark.sql.DataFrame = []

scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {')', ','}(line 1, pos 42)
```

After this PR, it's supported.

## How was this patch tested?

Pass the Jenkins test with a newly added testcase.

Author: Dongjoon Hyun <do...@apache.org>

Closes #15704 from dongjoon-hyun/SPARK-17732-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ce057d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ce057d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ce057d0

Branch: refs/heads/master
Commit: 3ce057d0010a0f6f8163046ba502a126adc68f33
Parents: 503378f
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Nov 15 15:59:04 2016 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Nov 15 15:59:04 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   6 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  30 +++++-
 .../spark/sql/execution/SparkSqlParser.scala    |   2 +-
 .../spark/sql/execution/command/ddl.scala       |  51 +++++++--
 .../datasources/DataSourceStrategy.scala        |   8 +-
 .../sql/execution/command/DDLCommandSuite.scala |   9 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 103 +++++++++++++++++++
 7 files changed, 185 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index b599a88..fcca11c 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -239,11 +239,7 @@ partitionSpecLocation
     ;
 
 partitionSpec
-    : PARTITION '(' partitionVal (',' partitionVal)* ')'
-    ;
-
-partitionVal
-    : identifier (EQ constant)?
+    : PARTITION '(' expression (',' expression)* ')'
     ;
 
 describeFuncName

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3fa7bf1..8e8d374 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -194,10 +194,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
    */
   override def visitPartitionSpec(
       ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
-    val parts = ctx.partitionVal.asScala.map { pVal =>
-      val name = pVal.identifier.getText
-      val value = Option(pVal.constant).map(visitStringConstant)
-      name -> value
+    val parts = ctx.expression.asScala.map { pVal =>
+      expression(pVal) match {
+        case UnresolvedAttribute(name :: Nil) =>
+          name -> None
+        case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
+          name -> Option(constant.toString)
+        case _ =>
+          throw new ParseException("Invalid partition filter specification", ctx)
+      }
     }
     // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
     // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
@@ -207,6 +212,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
   }
 
   /**
+   * Create a partition filter specification.
+   */
+  def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
+    val parts = ctx.expression.asScala.map { pVal =>
+      expression(pVal) match {
+        case EqualNullSafe(_, _) =>
+          throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
+        case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
+          cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
+        case _ =>
+          throw new ParseException("Invalid partition filter specification", ctx)
+      }
+    }
+    parts.reduceLeft(And)
+  }
+
+  /**
    * Create a partition specification map without optional values.
    */
   protected def visitNonOptionalPartitionSpec(

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index b8be3d1..112d812 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -813,7 +813,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     AlterTableDropPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
-      ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
+      ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
       ctx.EXISTS != null,
       ctx.PURGE != null)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 84a63fd..6c1c398 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
@@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
     tableName: TableIdentifier,
-    specs: Seq[TablePartitionSpec],
+    specs: Seq[Expression],
     ifExists: Boolean,
     purge: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
+
+  private def isRangeComparison(expr: Expression): Boolean = {
+    expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
+  }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
+    val resolver = sparkSession.sessionState.conf.resolver
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
     DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
 
-    val normalizedSpecs = specs.map { spec =>
-      PartitioningUtils.normalizePartitionSpec(
-        spec,
-        table.partitionColumnNames,
-        table.identifier.quotedString,
-        sparkSession.sessionState.conf.resolver)
+    specs.foreach { expr =>
+      expr.references.foreach { attr =>
+        if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
+          throw new AnalysisException(s"${attr.name} is not a valid partition column " +
+            s"in table ${table.identifier.quotedString}.")
+        }
+      }
     }
 
-    catalog.dropPartitions(
-      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+    if (specs.exists(isRangeComparison)) {
+      val partitionSet = specs.flatMap { spec =>
+        val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
+        if (partitions.isEmpty && !ifExists) {
+          throw new AnalysisException(s"There is no partition for ${spec.sql}")
+        }
+        partitions
+      }.distinct
+      catalog.dropPartitions(
+        table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
+    } else {
+      val normalizedSpecs = specs.map { expr =>
+        val spec = splitConjunctivePredicates(expr).map {
+          case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
+        }.toMap
+        PartitioningUtils.normalizePartitionSpec(
+          spec,
+          table.partitionColumnNames,
+          table.identifier.quotedString,
+          resolver)
+      }
+      catalog.dropPartitions(
+        table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+    }
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 4f19a2d..e81512d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,8 +215,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
           if (overwrite.enabled) {
             val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
             if (deletedPartitions.nonEmpty) {
+              import org.apache.spark.sql.catalyst.expressions._
+              val expressions = deletedPartitions.map { specs =>
+                specs.map { case (key, value) =>
+                  EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
+                }.reduceLeft(And)
+              }.toSeq
               AlterTableDropPartitionCommand(
-                l.catalogTable.get.identifier, deletedPartitions.toSeq,
+                l.catalogTable.get.identifier, expressions,
                 ifExists = true, purge = true).run(t.sparkSession)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d31e7ae..057528b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -612,8 +613,12 @@ class DDLCommandSuite extends PlanTest {
     val expected1_table = AlterTableDropPartitionCommand(
       tableIdent,
       Seq(
-        Map("dt" -> "2008-08-08", "country" -> "us"),
-        Map("dt" -> "2009-09-09", "country" -> "uk")),
+        And(
+          EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
+          EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
+        And(
+          EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
+          EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
       ifExists = true,
       purge = false)
     val expected2_table = expected1_table.copy(ifExists = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/3ce057d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6efae13..a2b0486 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -225,6 +226,108 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-17732: Drop partitions by filter") {
+    withTable("sales") {
+      sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+      for (country <- Seq("US", "CA", "KR")) {
+        for (quarter <- 1 to 4) {
+          sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
+        }
+      }
+
+      sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
+      checkAnswer(sql("SHOW PARTITIONS sales"),
+        Row("country=CA/quarter=1") ::
+        Row("country=CA/quarter=2") ::
+        Row("country=KR/quarter=1") ::
+        Row("country=KR/quarter=2") ::
+        Row("country=KR/quarter=3") ::
+        Row("country=KR/quarter=4") ::
+        Row("country=US/quarter=1") ::
+        Row("country=US/quarter=2") ::
+        Row("country=US/quarter=3") ::
+        Row("country=US/quarter=4") :: Nil)
+
+      sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
+      checkAnswer(sql("SHOW PARTITIONS sales"),
+        Row("country=KR/quarter=2") ::
+        Row("country=KR/quarter=3") ::
+        Row("country=KR/quarter=4") ::
+        Row("country=US/quarter=2") ::
+        Row("country=US/quarter=3") ::
+        Row("country=US/quarter=4") :: Nil)
+
+      sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
+      sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
+      checkAnswer(sql("SHOW PARTITIONS sales"),
+        Row("country=KR/quarter=2") ::
+        Row("country=KR/quarter=3") ::
+        Row("country=US/quarter=2") ::
+        Row("country=US/quarter=4") :: Nil)
+
+      sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
+      checkAnswer(sql("SHOW PARTITIONS sales"),
+        Row("country=KR/quarter=3") :: Nil)
+
+      // According to the declarative partition spec definitions, this drops the union of target
+      // partitions without exceptions. Hive raises exceptions because it handles them sequentially.
+      sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
+      checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
+    }
+  }
+
+  test("SPARK-17732: Error handling for drop partitions by filter") {
+    withTable("sales") {
+      sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+      val m = intercept[AnalysisException] {
+        sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
+      }.getMessage
+      assert(m.contains("unknown is not a valid partition column in table"))
+
+      val m2 = intercept[AnalysisException] {
+        sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
+      }.getMessage
+      assert(m2.contains("unknown is not a valid partition column in table"))
+
+      val m3 = intercept[AnalysisException] {
+        sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
+      }.getMessage
+      assert(m3.contains("'<=>' operator is not allowed in partition specification"))
+
+      val m4 = intercept[ParseException] {
+        sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
+      }.getMessage
+      assert(m4.contains("'<=>' operator is not allowed in partition specification"))
+
+      val m5 = intercept[ParseException] {
+        sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
+      }.getMessage
+      assert(m5.contains("Invalid partition filter specification"))
+
+      sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
+      val m6 = intercept[AnalysisException] {
+        sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
+      }.getMessage
+      // The query is not executed because `PARTITION (quarter <= '2')` is invalid.
+      checkAnswer(sql("SHOW PARTITIONS sales"),
+        Row("country=KR/quarter=3") :: Nil)
+      assert(m6.contains("There is no partition for (`quarter` <= '2')"))
+    }
+  }
+
+  test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
+    withTable("sales") {
+      sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
+
+      val m = intercept[ParseException] {
+        sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
+      }.getMessage()
+      assert(m.contains("Invalid partition filter specification"))
+    }
+  }
+
   test("drop views") {
     withTable("tab1") {
       val tabName = "tab1"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org