You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/07 08:21:52 UTC
[spark] branch branch-3.1 updated: [SPARK-33676][SQL] Require exact
matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP
PARTITION`
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a630e8d [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION`
a630e8d is described below
commit a630e8d14bd36ede97ba1469a6da148464c90ee3
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Dec 7 08:14:36 2020 +0000
[SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION`
### What changes were proposed in this pull request?
Check that partitions specs passed to v2 `ALTER TABLE .. ADD/DROP PARTITION` exactly match to the partition schema (all partition fields from the schema are specified in partition specs).
### Why are the changes needed?
1. To have the same behavior as V1 `ALTER TABLE .. ADD/DROP PARTITION` that output the error:
```sql
spark-sql> create table tab1 (id int, a int, b int) using parquet partitioned by (a, b);
spark-sql> ALTER TABLE tab1 ADD PARTITION (A='9');
Error in query: Partition spec is invalid. The spec (a) must match the partition spec (a, b) defined in table '`default`.`tab1`';
```
2. To prevent future errors caused by not fully specified partition specs.
### Does this PR introduce _any_ user-facing change?
Yes. The V2 implementation of `ALTER TABLE .. ADD/DROP PARTITION` output the same error as V1 commands.
### How was this patch tested?
By running the test suite with new UT:
```
$ build/sbt "test:testOnly *AlterTablePartitionV2SQLSuite"
```
Closes #30624 from MaxGekk/add-partition-full-spec.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 26c0493318c2a3e5b74ff3829de88605aff8e832)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/analysis/ResolvePartitionSpec.scala | 20 ++++++++++++++++----
.../spark/sql/catalyst/catalog/SessionCatalog.scala | 15 ++++++---------
.../apache/spark/sql/util/PartitioningUtils.scala | 18 ++++++++++++++++++
.../connector/AlterTablePartitionV2SQLSuite.scala | 20 ++++++++++++++++++++
4 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
index 38991a9..feb05d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
+import org.apache.spark.sql.util.PartitioningUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec}
/**
* Resolve [[UnresolvedPartitionSpec]] to [[ResolvedPartitionSpec]] in partition related commands.
@@ -35,11 +35,21 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case r @ AlterTableAddPartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _) =>
- r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema()))
+ val partitionSchema = table.partitionSchema()
+ r.copy(parts = resolvePartitionSpecs(
+ table.name,
+ partSpecs,
+ partitionSchema,
+ requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))
case r @ AlterTableDropPartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) =>
- r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema()))
+ val partitionSchema = table.partitionSchema()
+ r.copy(parts = resolvePartitionSpecs(
+ table.name,
+ partSpecs,
+ partitionSchema,
+ requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))
case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
r.copy(pattern = resolvePartitionSpecs(
@@ -51,7 +61,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
private def resolvePartitionSpecs(
tableName: String,
partSpecs: Seq[PartitionSpec],
- partSchema: StructType): Seq[ResolvedPartitionSpec] =
+ partSchema: StructType,
+ checkSpec: TablePartitionSpec => Unit = _ => ()): Seq[ResolvedPartitionSpec] =
partSpecs.map {
case unresolvedPartSpec: UnresolvedPartitionSpec =>
val normalizedSpec = normalizePartitionSpec(
@@ -59,6 +70,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
partSchema.map(_.name),
tableName,
conf.resolver)
+ checkSpec(normalizedSpec)
val partitionNames = normalizedSpec.keySet
val requestedFields = partSchema.filter(field => partitionNames.contains(field.name))
ResolvedPartitionSpec(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 29481b8..9a21b85 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
import org.apache.spark.util.Utils
object SessionCatalog {
@@ -1162,14 +1162,11 @@ class SessionCatalog(
private def requireExactMatchedPartitionSpec(
specs: Seq[TablePartitionSpec],
table: CatalogTable): Unit = {
- val defined = table.partitionColumnNames.sorted
- specs.foreach { s =>
- if (s.keys.toSeq.sorted != defined) {
- throw new AnalysisException(
- s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
- s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
- s"table '${table.identifier}'")
- }
+ specs.foreach { spec =>
+ PartitioningUtils.requireExactMatchedPartitionSpec(
+ table.identifier.toString,
+ spec,
+ table.partitionColumnNames)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
index 586aa6c..e473e1d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.util
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
object PartitioningUtils {
/**
@@ -44,4 +45,21 @@ object PartitioningUtils {
normalizedPartSpec.toMap
}
+
+ /**
+ * Verify if the input partition spec exactly matches the existing defined partition spec
+ * The columns must be the same but the orders could be different.
+ */
+ def requireExactMatchedPartitionSpec(
+ tableName: String,
+ spec: TablePartitionSpec,
+ partitionColumnNames: Seq[String]): Unit = {
+ val defined = partitionColumnNames.sorted
+ if (spec.keys.toSeq.sorted != defined) {
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec (${spec.keys.mkString(", ")}) must match " +
+ s"the partition spec (${partitionColumnNames.mkString(", ")}) defined in " +
+ s"table '$tableName'")
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 47b5e5e..45d47c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -261,4 +261,24 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}
}
+
+ test("SPARK-33676: not fully specified partition spec") {
+ val t = "testpart.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"""
+ |CREATE TABLE $t (id bigint, part0 int, part1 string)
+ |USING foo
+ |PARTITIONED BY (part0, part1)""".stripMargin)
+ Seq(
+ s"ALTER TABLE $t ADD PARTITION (part0 = 1)",
+ s"ALTER TABLE $t DROP PARTITION (part0 = 1)"
+ ).foreach { alterTable =>
+ val errMsg = intercept[AnalysisException] {
+ sql(alterTable)
+ }.getMessage
+ assert(errMsg.contains("Partition spec is invalid. " +
+ "The spec (part0) must match the partition spec (part0, part1)"))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org