You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/10 06:24:08 UTC

[spark] branch branch-3.1 updated: Revert "[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION tests"

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new fdbfd14  Revert "[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION tests"
fdbfd14 is described below

commit fdbfd1465f3811aec3fb68f82c1fdae9e5787226
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Dec 10 14:21:05 2020 +0800

    Revert "[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION tests"
    
    This reverts commit 9282a337cb5aa47e936e600dfc8776cc92e9145d.
---
 .../catalyst/analysis/ResolvePartitionSpec.scala   |   2 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  27 +++
 .../connector/AlterTablePartitionV2SQLSuite.scala  | 152 +++++++++++++++--
 .../AlterTableAddPartitionParserSuite.scala        |  51 ------
 .../command/AlterTableAddPartitionSuiteBase.scala  | 187 ---------------------
 .../spark/sql/execution/command/DDLSuite.scala     |  61 +++++++
 .../command/v1/AlterTableAddPartitionSuite.scala   |  64 -------
 .../command/v2/AlterTableAddPartitionSuite.scala   |  89 ----------
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |   4 +
 .../command/AlterTableAddPartitionSuite.scala      |  46 -----
 10 files changed, 233 insertions(+), 450 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
index 099ac61..feb05d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
@@ -81,7 +81,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
         resolvedPartitionSpec
     }
 
-  private[sql] def convertToPartIdent(
+  private def convertToPartIdent(
       partitionSpec: TablePartitionSpec,
       schema: Seq[StructField]): InternalRow = {
     val partValues = schema.map { part =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e767a76..0f1b4a3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2031,6 +2031,33 @@ class DDLParserSuite extends AnalysisTest {
       AlterTableRecoverPartitionsStatement(Seq("a", "b", "c")))
   }
 
+  test("alter table: add partition") {
+    val sql1 =
+      """
+        |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
+        |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
+        |(dt='2009-09-09', country='uk')
+      """.stripMargin
+    val sql2 = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"
+
+    val parsed1 = parsePlan(sql1)
+    val parsed2 = parsePlan(sql2)
+
+    val expected1 = AlterTableAddPartition(
+      UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
+      Seq(
+        UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
+        UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
+      ifNotExists = true)
+    val expected2 = AlterTableAddPartition(
+      UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
+      Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
+      ifNotExists = false)
+
+    comparePlans(parsed1, expected1)
+    comparePlans(parsed2, expected2)
+  }
+
   test("alter view: add partition (not supported)") {
     assertUnsupported(
       """
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 5709769..45d47c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.sql.connector
 
+import java.time.{LocalDate, LocalDateTime}
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
 
 class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
 
@@ -41,6 +45,66 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
     }
   }
 
+  test("ALTER TABLE ADD PARTITION") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+      spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")
+
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+      assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+
+      val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
+      assert(partMetadata.containsKey("location"))
+      assert(partMetadata.get("location") == "loc")
+    }
+  }
+
+  test("ALTER TABLE ADD PARTITIONS") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+      spark.sql(
+        s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'")
+
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+      assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+      assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))
+
+      val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
+      assert(partMetadata.containsKey("location"))
+      assert(partMetadata.get("location") == "loc")
+
+      val partMetadata1 = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(2)))
+      assert(partMetadata1.containsKey("location"))
+      assert(partMetadata1.get("location") == "loc1")
+    }
+  }
+
+  test("ALTER TABLE ADD PARTITIONS: partition already exists") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+      spark.sql(
+        s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
+
+      assertThrows[PartitionsAlreadyExistException](
+        spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
+          " PARTITION (id=2) LOCATION 'loc1'"))
+
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+      assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+
+      spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
+        " PARTITION (id=2) LOCATION 'loc1'")
+      assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+      assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))
+    }
+  }
+
   test("ALTER TABLE RENAME PARTITION") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
@@ -109,7 +173,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
       spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
         val errMsg = intercept[AnalysisException] {
-          spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)")
+          spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
         }.getMessage
         assert(errMsg.contains(s"ID is not a valid partition column in table $t"))
       }
@@ -128,14 +192,73 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
     }
   }
 
-  test("SPARK-33650: drop partition into a table which doesn't support partition management") {
+  test("SPARK-33521: universal type conversions of partition values") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"""
+        |CREATE TABLE $t (
+        |  part0 tinyint,
+        |  part1 smallint,
+        |  part2 int,
+        |  part3 bigint,
+        |  part4 float,
+        |  part5 double,
+        |  part6 string,
+        |  part7 boolean,
+        |  part8 date,
+        |  part9 timestamp
+        |) USING foo
+        |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
+        |""".stripMargin)
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
+        .asPartitionable
+      val expectedPartition = InternalRow.fromSeq(Seq[Any](
+        -1,    // tinyint
+        0,     // smallint
+        1,     // int
+        2,     // bigint
+        3.14F, // float
+        3.14D, // double
+        UTF8String.fromString("abc"), // string
+        true, // boolean
+        LocalDate.parse("2020-11-23").toEpochDay,
+        DateTimeUtils.instantToMicros(
+          LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant)
+      ))
+      assert(!partTable.partitionExists(expectedPartition))
+      val partSpec = """
+        |  part0 = -1,
+        |  part1 = 0,
+        |  part2 = 1,
+        |  part3 = 2,
+        |  part4 = 3.14,
+        |  part5 = 3.14,
+        |  part6 = 'abc',
+        |  part7 = true,
+        |  part8 = '2020-11-23',
+        |  part9 = '2020-11-23T22:13:10.123456'
+        |""".stripMargin
+      sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
+      assert(partTable.partitionExists(expectedPartition))
+      sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)")
+      assert(!partTable.partitionExists(expectedPartition))
+    }
+  }
+
+  test("SPARK-33650: add/drop partition into a table which doesn't support partition management") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {
       spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _")
-      val errMsg = intercept[AnalysisException] {
-        spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
-      }.getMessage
-      assert(errMsg.contains(s"Table $t can not alter partitions"))
+      Seq(
+        s"ALTER TABLE $t ADD PARTITION (id=1)",
+        s"ALTER TABLE $t DROP PARTITION (id=1)"
+      ).foreach { alterTable =>
+        val errMsg = intercept[AnalysisException] {
+          spark.sql(alterTable)
+        }.getMessage
+        assert(errMsg.contains(s"Table $t can not alter partitions"))
+      }
     }
   }
 
@@ -146,11 +269,16 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
         |CREATE TABLE $t (id bigint, part0 int, part1 string)
         |USING foo
         |PARTITIONED BY (part0, part1)""".stripMargin)
-      val errMsg = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)")
-      }.getMessage
-      assert(errMsg.contains("Partition spec is invalid. " +
-        "The spec (part0) must match the partition spec (part0, part1)"))
+      Seq(
+        s"ALTER TABLE $t ADD PARTITION (part0 = 1)",
+        s"ALTER TABLE $t DROP PARTITION (part0 = 1)"
+      ).foreach { alterTable =>
+        val errMsg = intercept[AnalysisException] {
+          sql(alterTable)
+        }.getMessage
+        assert(errMsg.contains("Partition spec is invalid. " +
+          "The spec (part0) must match the partition spec (part0, part1)"))
+      }
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala
deleted file mode 100644
index 5ebca8f..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
-import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession {
-  test("add partition if not exists") {
-    val sql = """
-      |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
-      |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
-      |(dt='2009-09-09', country='uk')""".stripMargin
-    val parsed = parsePlan(sql)
-    val expected = AlterTableAddPartition(
-      UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
-      Seq(
-        UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
-        UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
-      ifNotExists = true)
-    comparePlans(parsed, expected)
-  }
-
-  test("add partition") {
-    val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"
-    val parsed = parsePlan(sql)
-    val expected = AlterTableAddPartition(
-      UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
-      Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
-      ifNotExists = false)
-
-    comparePlans(parsed, expected)
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
deleted file mode 100644
index 0cf0b39..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import org.scalactic.source.Position
-import org.scalatest.Tag
-
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
-  protected def version: String
-  protected def catalog: String
-  protected def defaultUsing: String
-
-  override def test(testName: String, testTags: Tag*)(testFun: => Any)
-    (implicit pos: Position): Unit = {
-    super.test(s"ALTER TABLE .. ADD PARTITION $version: " + testName, testTags: _*)(testFun)
-  }
-
-  protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = {
-    val partitions = sql(s"SHOW PARTITIONS $t")
-      .collect()
-      .toSet
-      .map((row: Row) => row.getString(0))
-      .map(PartitioningUtils.parsePathFragment)
-    assert(partitions === expected.toSet)
-  }
-  protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit
-
-  protected def withNsTable(ns: String, tableName: String, cat: String = catalog)
-      (f: String => Unit): Unit = {
-    val nsCat = s"$cat.$ns"
-    withNamespace(nsCat) {
-      sql(s"CREATE NAMESPACE $nsCat")
-      val t = s"$nsCat.$tableName"
-      withTable(t) {
-        f(t)
-      }
-    }
-  }
-
-  test("one partition") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      Seq("", "IF NOT EXISTS").foreach { exists =>
-        sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'")
-
-        checkPartitions(t, Map("id" -> "1"))
-        checkLocation(t, Map("id" -> "1"), "loc")
-      }
-    }
-  }
-
-  test("multiple partitions") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      Seq("", "IF NOT EXISTS").foreach { exists =>
-        sql(s"""
-          |ALTER TABLE $t ADD $exists
-          |PARTITION (id=1) LOCATION 'loc'
-          |PARTITION (id=2) LOCATION 'loc1'""".stripMargin)
-
-        checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
-        checkLocation(t, Map("id" -> "1"), "loc")
-        checkLocation(t, Map("id" -> "2"), "loc1")
-      }
-    }
-  }
-
-  test("multi-part partition") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)")
-      Seq("", "IF NOT EXISTS").foreach { exists =>
-        sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')")
-
-        checkPartitions(t, Map("a" -> "2", "b" -> "abc"))
-      }
-    }
-  }
-
-  test("table to alter does not exist") {
-    withNsTable("ns", "does_not_exist") { t =>
-      val errMsg = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')")
-      }.getMessage
-      assert(errMsg.contains("Table not found"))
-    }
-  }
-
-  test("case sensitivity in resolving partition specs") {
-    withNsTable("ns", "tbl") { t =>
-      spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-        val errMsg = intercept[AnalysisException] {
-          spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
-        }.getMessage
-        assert(errMsg.contains("ID is not a valid partition column"))
-      }
-      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-        spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
-        checkPartitions(t, Map("id" -> "1"))
-        checkLocation(t, Map("id" -> "1"), "loc1")
-      }
-    }
-  }
-
-  test("SPARK-33521: universal type conversions of partition values") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"""
-        |CREATE TABLE $t (
-        |  id int,
-        |  part0 tinyint,
-        |  part1 smallint,
-        |  part2 int,
-        |  part3 bigint,
-        |  part4 float,
-        |  part5 double,
-        |  part6 string,
-        |  part7 boolean,
-        |  part8 date,
-        |  part9 timestamp
-        |) $defaultUsing
-        |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
-        |""".stripMargin)
-      val partSpec = """
-        |  part0 = -1,
-        |  part1 = 0,
-        |  part2 = 1,
-        |  part3 = 2,
-        |  part4 = 3.14,
-        |  part5 = 3.14,
-        |  part6 = 'abc',
-        |  part7 = true,
-        |  part8 = '2020-11-23',
-        |  part9 = '2020-11-23 22:13:10.123456'
-        |""".stripMargin
-      sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
-      val expected = Map(
-        "part0" -> "-1",
-        "part1" -> "0",
-        "part2" -> "1",
-        "part3" -> "2",
-        "part4" -> "3.14",
-        "part5" -> "3.14",
-        "part6" -> "abc",
-        "part7" -> "true",
-        "part8" -> "2020-11-23",
-        "part9" -> "2020-11-23 22:13:10.123456")
-      checkPartitions(t, expected)
-      sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)")
-      checkPartitions(t) // no partitions
-    }
-  }
-
-  test("SPARK-33676: not fully specified partition spec") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"""
-        |CREATE TABLE $t (id bigint, part0 int, part1 string)
-        |$defaultUsing
-        |PARTITIONED BY (part0, part1)""".stripMargin)
-      val errMsg = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (part0 = 1)")
-      }.getMessage
-      assert(errMsg.contains("Partition spec is invalid. " +
-        "The spec (part0) must match the partition spec (part0, part1)"))
-    }
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index f6268ef..4f79e71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -334,6 +334,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     testChangeColumn(isDatasourceTable = true)
   }
 
+  test("alter table: add partition (datasource table)") {
+    testAddPartitions(isDatasourceTable = true)
+  }
+
   test("alter table: drop partition (datasource table)") {
     testDropPartitions(isDatasourceTable = true)
   }
@@ -1618,6 +1622,63 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1", "b" -> "5")
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val part3 = Map("a" -> "3", "b" -> "7")
+    val part4 = Map("a" -> "4", "b" -> "8")
+    val part5 = Map("a" -> "9", "b" -> "9")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent, isDatasourceTable)
+    createTablePartition(catalog, part1, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+    // basic add partition
+    sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+      "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
+
+    val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
+    assert(tableLocation.isDefined)
+    val partitionLocation = makeQualifiedPath(
+      new Path(tableLocation.get.toString, "paris").toString)
+
+    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
+    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
+
+    // add partitions without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
+
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
+    }
+
+    // partition to add already exists
+    intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
+    }
+
+    // partition to add already exists when using IF NOT EXISTS
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
+
+    // partition spec in ADD PARTITION should be case insensitive by default
+    sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4, part5))
+  }
+
   protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
     if (!isUsingHiveMetastore) {
       assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index 295ce1d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.v1
-
-import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.execution.command
-import org.apache.spark.sql.test.SharedSparkSession
-
-trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase {
-  override def version: String = "V1"
-  override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
-  override def defaultUsing: String = "USING parquet"
-
-  override protected def checkLocation(
-      t: String,
-      spec: TablePartitionSpec,
-      expected: String): Unit = {
-    val tablePath = t.split('.')
-    val tableName = tablePath.last
-    val ns = tablePath.init.mkString(".")
-    val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ")
-    val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)")
-      .select("information")
-      .first().getString(0)
-    val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head
-    assert(location.endsWith(expected))
-  }
-}
-
-class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession {
-  test("partition already exists") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
-      val errMsg = intercept[PartitionsAlreadyExistException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
-          " PARTITION (id=2) LOCATION 'loc1'")
-      }.getMessage
-      assert(errMsg.contains("The following partitions already exists"))
-
-      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
-        " PARTITION (id=2) LOCATION 'loc1'")
-      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
-    }
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index b15235d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.v2
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
-import org.apache.spark.sql.execution.command
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlterTableAddPartitionSuite
-  extends command.AlterTableAddPartitionSuiteBase
-  with SharedSparkSession {
-
-  import CatalogV2Implicits._
-
-  override def version: String = "V2"
-  override def catalog: String = "test_catalog"
-  override def defaultUsing: String = "USING _"
-
-  override def sparkConf: SparkConf = super.sparkConf
-    .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
-    .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
-
-  override protected def checkLocation(
-      t: String,
-      spec: TablePartitionSpec,
-      expected: String): Unit = {
-    val tablePath = t.split('.')
-    val catalogName = tablePath.head
-    val namespaceWithTable = tablePath.tail
-    val namespaces = namespaceWithTable.init
-    val tableName = namespaceWithTable.last
-    val catalogPlugin = spark.sessionState.catalogManager.catalog(catalogName)
-    val partTable = catalogPlugin.asTableCatalog
-      .loadTable(Identifier.of(namespaces, tableName))
-      .asInstanceOf[InMemoryPartitionTable]
-    val ident = ResolvePartitionSpec.convertToPartIdent(spec, partTable.partitionSchema.fields)
-    val partMetadata = partTable.loadPartitionMetadata(ident)
-
-    assert(partMetadata.containsKey("location"))
-    assert(partMetadata.get("location") === expected)
-  }
-
-  test("partition already exists") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
-      val errMsg = intercept[PartitionsAlreadyExistException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
-          " PARTITION (id=2) LOCATION 'loc1'")
-      }.getMessage
-      assert(errMsg.contains("The following partitions already exists"))
-
-      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
-        " PARTITION (id=2) LOCATION 'loc1'")
-      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
-    }
-  }
-
-  test("SPARK-33650: add partition into a table which doesn't support partition management") {
-    withNsTable("ns", "tbl", s"non_part_$catalog") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
-      val errMsg = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
-      }.getMessage
-      assert(errMsg.contains(s"Table $t can not alter partitions"))
-    }
-  }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7302ee9..ce31e39 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -166,6 +166,10 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
     testDropPartitions(isDatasourceTable = false)
   }
 
+  test("alter table: add partition") {
+    testAddPartitions(isDatasourceTable = false)
+  }
+
   test("drop table") {
     testDropTable(isDatasourceTable = false)
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index ef0ec8d..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution.command
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.execution.command.v1
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-
-class AlterTableAddPartitionSuite
-    extends v1.AlterTableAddPartitionSuiteBase
-    with TestHiveSingleton {
-  override def version: String = "Hive V1"
-  override def defaultUsing: String = "USING HIVE"
-
-  test("partition already exists") {
-    withNsTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
-      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
-      val errMsg = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
-          " PARTITION (id=2) LOCATION 'loc1'")
-      }.getMessage
-      assert(errMsg.contains("already exists"))
-
-      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
-        " PARTITION (id=2) LOCATION 'loc1'")
-      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org