You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/07 08:21:52 UTC

[spark] branch branch-3.1 updated: [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a630e8d  [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION`
a630e8d is described below

commit a630e8d14bd36ede97ba1469a6da148464c90ee3
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Dec 7 08:14:36 2020 +0000

    [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION`
    
    ### What changes were proposed in this pull request?
    Check that partitions specs passed to v2 `ALTER TABLE .. ADD/DROP PARTITION` exactly match to the partition schema (all partition fields from the schema are specified in partition specs).
    
    ### Why are the changes needed?
    1. To have the same behavior as V1 `ALTER TABLE .. ADD/DROP PARTITION` that output the error:
    ```sql
    spark-sql> create table tab1 (id int, a int, b int) using parquet partitioned by (a, b);
    spark-sql> ALTER TABLE tab1 ADD PARTITION (A='9');
    Error in query: Partition spec is invalid. The spec (a) must match the partition spec (a, b) defined in table '`default`.`tab1`';
    ```
    2. To prevent future errors caused by not fully specified partition specs.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. The V2 implementation of `ALTER TABLE .. ADD/DROP PARTITION` output the same error as V1 commands.
    
    ### How was this patch tested?
    By running the test suite with new UT:
    ```
    $ build/sbt "test:testOnly *AlterTablePartitionV2SQLSuite"
    ```
    
    Closes #30624 from MaxGekk/add-partition-full-spec.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 26c0493318c2a3e5b74ff3829de88605aff8e832)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/ResolvePartitionSpec.scala | 20 ++++++++++++++++----
 .../spark/sql/catalyst/catalog/SessionCatalog.scala  | 15 ++++++---------
 .../apache/spark/sql/util/PartitioningUtils.scala    | 18 ++++++++++++++++++
 .../connector/AlterTablePartitionV2SQLSuite.scala    | 20 ++++++++++++++++++++
 4 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
index 38991a9..feb05d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
+import org.apache.spark.sql.util.PartitioningUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec}
 
 /**
  * Resolve [[UnresolvedPartitionSpec]] to [[ResolvedPartitionSpec]] in partition related commands.
@@ -35,11 +35,21 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case r @ AlterTableAddPartition(
         ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _) =>
-      r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema()))
+      val partitionSchema = table.partitionSchema()
+      r.copy(parts = resolvePartitionSpecs(
+        table.name,
+        partSpecs,
+        partitionSchema,
+        requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))
 
     case r @ AlterTableDropPartition(
         ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) =>
-      r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema()))
+      val partitionSchema = table.partitionSchema()
+      r.copy(parts = resolvePartitionSpecs(
+        table.name,
+        partSpecs,
+        partitionSchema,
+        requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))
 
     case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
       r.copy(pattern = resolvePartitionSpecs(
@@ -51,7 +61,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
   private def resolvePartitionSpecs(
       tableName: String,
       partSpecs: Seq[PartitionSpec],
-      partSchema: StructType): Seq[ResolvedPartitionSpec] =
+      partSchema: StructType,
+      checkSpec: TablePartitionSpec => Unit = _ => ()): Seq[ResolvedPartitionSpec] =
     partSpecs.map {
       case unresolvedPartSpec: UnresolvedPartitionSpec =>
         val normalizedSpec = normalizePartitionSpec(
@@ -59,6 +70,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
           partSchema.map(_.name),
           tableName,
           conf.resolver)
+        checkSpec(normalizedSpec)
         val partitionNames = normalizedSpec.keySet
         val requestedFields = partSchema.filter(field => partitionNames.contains(field.name))
         ResolvedPartitionSpec(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 29481b8..9a21b85 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
 import org.apache.spark.util.Utils
 
 object SessionCatalog {
@@ -1162,14 +1162,11 @@ class SessionCatalog(
   private def requireExactMatchedPartitionSpec(
       specs: Seq[TablePartitionSpec],
       table: CatalogTable): Unit = {
-    val defined = table.partitionColumnNames.sorted
-    specs.foreach { s =>
-      if (s.keys.toSeq.sorted != defined) {
-        throw new AnalysisException(
-          s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
-            s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
-            s"table '${table.identifier}'")
-      }
+    specs.foreach { spec =>
+      PartitioningUtils.requireExactMatchedPartitionSpec(
+        table.identifier.toString,
+        spec,
+        table.partitionColumnNames)
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
index 586aa6c..e473e1d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.util
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 
 object PartitioningUtils {
   /**
@@ -44,4 +45,21 @@ object PartitioningUtils {
 
     normalizedPartSpec.toMap
   }
+
+  /**
+   * Verify if the input partition spec exactly matches the existing defined partition spec
+   * The columns must be the same but the orders could be different.
+   */
+  def requireExactMatchedPartitionSpec(
+      tableName: String,
+      spec: TablePartitionSpec,
+      partitionColumnNames: Seq[String]): Unit = {
+    val defined = partitionColumnNames.sorted
+    if (spec.keys.toSeq.sorted != defined) {
+      throw new AnalysisException(
+        s"Partition spec is invalid. The spec (${spec.keys.mkString(", ")}) must match " +
+        s"the partition spec (${partitionColumnNames.mkString(", ")}) defined in " +
+        s"table '$tableName'")
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 47b5e5e..45d47c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -261,4 +261,24 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
       }
     }
   }
+
+  test("SPARK-33676: not fully specified partition spec") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"""
+        |CREATE TABLE $t (id bigint, part0 int, part1 string)
+        |USING foo
+        |PARTITIONED BY (part0, part1)""".stripMargin)
+      Seq(
+        s"ALTER TABLE $t ADD PARTITION (part0 = 1)",
+        s"ALTER TABLE $t DROP PARTITION (part0 = 1)"
+      ).foreach { alterTable =>
+        val errMsg = intercept[AnalysisException] {
+          sql(alterTable)
+        }.getMessage
+        assert(errMsg.contains("Partition spec is invalid. " +
+          "The spec (part0) must match the partition spec (part0, part1)"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org