You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/20 20:46:39 UTC
spark git commit: [SPARK-17732][SQL] Revert ALTER TABLE DROP
PARTITION should support comparators
Repository: spark
Updated Branches:
refs/heads/branch-2.1 bc3e7b3b8 -> cffaf5035
[SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators
This reverts commit 1126c3194ee1c79015cf1d3808bc963aa93dcadf.
Author: Herman van Hovell <hv...@databricks.com>
Closes #15948 from hvanhovell/SPARK-17732.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cffaf503
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cffaf503
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cffaf503
Branch: refs/heads/branch-2.1
Commit: cffaf5035816fa6ffc4dadd47bede1eff6371fee
Parents: bc3e7b3
Author: Herman van Hovell <hv...@databricks.com>
Authored: Sun Nov 20 12:46:29 2016 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Nov 20 12:46:29 2016 -0800
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 6 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 30 +-----
.../spark/sql/execution/SparkSqlParser.scala | 2 +-
.../spark/sql/execution/command/ddl.scala | 51 ++-------
.../datasources/DataSourceStrategy.scala | 8 +-
.../sql/execution/command/DDLCommandSuite.scala | 9 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 103 -------------------
7 files changed, 24 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index fcca11c..b599a88 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -239,7 +239,11 @@ partitionSpecLocation
;
partitionSpec
- : PARTITION '(' expression (',' expression)* ')'
+ : PARTITION '(' partitionVal (',' partitionVal)* ')'
+ ;
+
+partitionVal
+ : identifier (EQ constant)?
;
describeFuncName
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 97056bb..2006844 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
- val parts = ctx.expression.asScala.map { pVal =>
- expression(pVal) match {
- case UnresolvedAttribute(name :: Nil) =>
- name -> None
- case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
- name -> Option(constant.toString)
- case _ =>
- throw new ParseException("Invalid partition filter specification", ctx)
- }
+ val parts = ctx.partitionVal.asScala.map { pVal =>
+ val name = pVal.identifier.getText
+ val value = Option(pVal.constant).map(visitStringConstant)
+ name -> value
}
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
@@ -212,23 +207,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
/**
- * Create a partition filter specification.
- */
- def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
- val parts = ctx.expression.asScala.map { pVal =>
- expression(pVal) match {
- case EqualNullSafe(_, _) =>
- throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
- case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
- cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
- case _ =>
- throw new ParseException("Invalid partition filter specification", ctx)
- }
- }
- parts.reduceLeft(And)
- }
-
- /**
* Create a partition specification map without optional values.
*/
protected def visitNonOptionalPartitionSpec(
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 112d812..b8be3d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -813,7 +813,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
- ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
+ ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 588aa05..570a996 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
-import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -419,55 +418,27 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
- specs: Seq[Expression],
+ specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)
- extends RunnableCommand with PredicateHelper {
-
- private def isRangeComparison(expr: Expression): Boolean = {
- expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
- }
+ extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
- val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
- specs.foreach { expr =>
- expr.references.foreach { attr =>
- if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
- throw new AnalysisException(s"${attr.name} is not a valid partition column " +
- s"in table ${table.identifier.quotedString}.")
- }
- }
+ val normalizedSpecs = specs.map { spec =>
+ PartitioningUtils.normalizePartitionSpec(
+ spec,
+ table.partitionColumnNames,
+ table.identifier.quotedString,
+ sparkSession.sessionState.conf.resolver)
}
- if (specs.exists(isRangeComparison)) {
- val partitionSet = specs.flatMap { spec =>
- val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
- if (partitions.isEmpty && !ifExists) {
- throw new AnalysisException(s"There is no partition for ${spec.sql}")
- }
- partitions
- }.distinct
- catalog.dropPartitions(
- table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
- } else {
- val normalizedSpecs = specs.map { expr =>
- val spec = splitConjunctivePredicates(expr).map {
- case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
- }.toMap
- PartitioningUtils.normalizePartitionSpec(
- spec,
- table.partitionColumnNames,
- table.identifier.quotedString,
- resolver)
- }
- catalog.dropPartitions(
- table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
- }
+ catalog.dropPartitions(
+ table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e81512d..4f19a2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -215,14 +215,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
- import org.apache.spark.sql.catalyst.expressions._
- val expressions = deletedPartitions.map { specs =>
- specs.map { case (key, value) =>
- EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
- }.reduceLeft(And)
- }.toSeq
AlterTableDropPartitionCommand(
- l.catalogTable.get.identifier, expressions,
+ l.catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = true).run(t.sparkSession)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 057528b..d31e7ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -613,12 +612,8 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
- And(
- EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
- EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
- And(
- EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
- EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
+ Map("dt" -> "2008-08-08", "country" -> "us"),
+ Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
http://git-wip-us.apache.org/repos/asf/spark/blob/cffaf503/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 15e3927..951e070 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -226,108 +225,6 @@ class HiveDDLSuite
}
}
- test("SPARK-17732: Drop partitions by filter") {
- withTable("sales") {
- sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
-
- for (country <- Seq("US", "CA", "KR")) {
- for (quarter <- 1 to 4) {
- sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
- }
- }
-
- sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
- checkAnswer(sql("SHOW PARTITIONS sales"),
- Row("country=CA/quarter=1") ::
- Row("country=CA/quarter=2") ::
- Row("country=KR/quarter=1") ::
- Row("country=KR/quarter=2") ::
- Row("country=KR/quarter=3") ::
- Row("country=KR/quarter=4") ::
- Row("country=US/quarter=1") ::
- Row("country=US/quarter=2") ::
- Row("country=US/quarter=3") ::
- Row("country=US/quarter=4") :: Nil)
-
- sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
- checkAnswer(sql("SHOW PARTITIONS sales"),
- Row("country=KR/quarter=2") ::
- Row("country=KR/quarter=3") ::
- Row("country=KR/quarter=4") ::
- Row("country=US/quarter=2") ::
- Row("country=US/quarter=3") ::
- Row("country=US/quarter=4") :: Nil)
-
- sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
- sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
- checkAnswer(sql("SHOW PARTITIONS sales"),
- Row("country=KR/quarter=2") ::
- Row("country=KR/quarter=3") ::
- Row("country=US/quarter=2") ::
- Row("country=US/quarter=4") :: Nil)
-
- sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
- checkAnswer(sql("SHOW PARTITIONS sales"),
- Row("country=KR/quarter=3") :: Nil)
-
- // According to the declarative partition spec definitions, this drops the union of target
- // partitions without exceptions. Hive raises exceptions because it handles them sequentially.
- sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
- checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
- }
- }
-
- test("SPARK-17732: Error handling for drop partitions by filter") {
- withTable("sales") {
- sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
-
- val m = intercept[AnalysisException] {
- sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
- }.getMessage
- assert(m.contains("unknown is not a valid partition column in table"))
-
- val m2 = intercept[AnalysisException] {
- sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
- }.getMessage
- assert(m2.contains("unknown is not a valid partition column in table"))
-
- val m3 = intercept[AnalysisException] {
- sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
- }.getMessage
- assert(m3.contains("'<=>' operator is not allowed in partition specification"))
-
- val m4 = intercept[ParseException] {
- sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
- }.getMessage
- assert(m4.contains("'<=>' operator is not allowed in partition specification"))
-
- val m5 = intercept[ParseException] {
- sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
- }.getMessage
- assert(m5.contains("Invalid partition filter specification"))
-
- sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
- val m6 = intercept[AnalysisException] {
- sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
- }.getMessage
- // The query is not executed because `PARTITION (quarter <= '2')` is invalid.
- checkAnswer(sql("SHOW PARTITIONS sales"),
- Row("country=KR/quarter=3") :: Nil)
- assert(m6.contains("There is no partition for (`quarter` <= '2')"))
- }
- }
-
- test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
- withTable("sales") {
- sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
-
- val m = intercept[ParseException] {
- sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
- }.getMessage()
- assert(m.contains("Invalid partition filter specification"))
- }
- }
-
test("drop views") {
withTable("tab1") {
val tabName = "tab1"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org