You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/10 06:24:08 UTC
[spark] branch branch-3.1 updated: Revert
"[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION
tests"
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new fdbfd14 Revert "[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION tests"
fdbfd14 is described below
commit fdbfd1465f3811aec3fb68f82c1fdae9e5787226
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Dec 10 14:21:05 2020 +0800
Revert "[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITION tests"
This reverts commit 9282a337cb5aa47e936e600dfc8776cc92e9145d.
---
.../catalyst/analysis/ResolvePartitionSpec.scala | 2 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 27 +++
.../connector/AlterTablePartitionV2SQLSuite.scala | 152 +++++++++++++++--
.../AlterTableAddPartitionParserSuite.scala | 51 ------
.../command/AlterTableAddPartitionSuiteBase.scala | 187 ---------------------
.../spark/sql/execution/command/DDLSuite.scala | 61 +++++++
.../command/v1/AlterTableAddPartitionSuite.scala | 64 -------
.../command/v2/AlterTableAddPartitionSuite.scala | 89 ----------
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +
.../command/AlterTableAddPartitionSuite.scala | 46 -----
10 files changed, 233 insertions(+), 450 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
index 099ac61..feb05d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
@@ -81,7 +81,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
resolvedPartitionSpec
}
- private[sql] def convertToPartIdent(
+ private def convertToPartIdent(
partitionSpec: TablePartitionSpec,
schema: Seq[StructField]): InternalRow = {
val partValues = schema.map { part =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e767a76..0f1b4a3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2031,6 +2031,33 @@ class DDLParserSuite extends AnalysisTest {
AlterTableRecoverPartitionsStatement(Seq("a", "b", "c")))
}
+ test("alter table: add partition") {
+ val sql1 =
+ """
+ |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
+ |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
+ |(dt='2009-09-09', country='uk')
+ """.stripMargin
+ val sql2 = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"
+
+ val parsed1 = parsePlan(sql1)
+ val parsed2 = parsePlan(sql2)
+
+ val expected1 = AlterTableAddPartition(
+ UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
+ Seq(
+ UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
+ UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
+ ifNotExists = true)
+ val expected2 = AlterTableAddPartition(
+ UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
+ Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
+ ifNotExists = false)
+
+ comparePlans(parsed1, expected1)
+ comparePlans(parsed2, expected2)
+ }
+
test("alter view: add partition (not supported)") {
assertUnsupported(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 5709769..45d47c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -17,12 +17,16 @@
package org.apache.spark.sql.connector
+import java.time.{LocalDate, LocalDateTime}
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.unsafe.types.UTF8String
class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
@@ -41,6 +45,66 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}
+ test("ALTER TABLE ADD PARTITION") {
+ val t = "testpart.ns1.ns2.tbl"
+ withTable(t) {
+ spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+ spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")
+
+ val partTable = catalog("testpart").asTableCatalog
+ .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+ assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+
+ val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
+ assert(partMetadata.containsKey("location"))
+ assert(partMetadata.get("location") == "loc")
+ }
+ }
+
+ test("ALTER TABLE ADD PARTITIONS") {
+ val t = "testpart.ns1.ns2.tbl"
+ withTable(t) {
+ spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+ spark.sql(
+ s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'")
+
+ val partTable = catalog("testpart").asTableCatalog
+ .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+ assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+ assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))
+
+ val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
+ assert(partMetadata.containsKey("location"))
+ assert(partMetadata.get("location") == "loc")
+
+ val partMetadata1 = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(2)))
+ assert(partMetadata1.containsKey("location"))
+ assert(partMetadata1.get("location") == "loc1")
+ }
+ }
+
+ test("ALTER TABLE ADD PARTITIONS: partition already exists") {
+ val t = "testpart.ns1.ns2.tbl"
+ withTable(t) {
+ spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
+ spark.sql(
+ s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
+
+ assertThrows[PartitionsAlreadyExistException](
+ spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
+ " PARTITION (id=2) LOCATION 'loc1'"))
+
+ val partTable = catalog("testpart").asTableCatalog
+ .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
+ assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+
+ spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
+ " PARTITION (id=2) LOCATION 'loc1'")
+ assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
+ assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))
+ }
+ }
+
test("ALTER TABLE RENAME PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
@@ -109,7 +173,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val errMsg = intercept[AnalysisException] {
- spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)")
+ spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains(s"ID is not a valid partition column in table $t"))
}
@@ -128,14 +192,73 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}
- test("SPARK-33650: drop partition into a table which doesn't support partition management") {
+ test("SPARK-33521: universal type conversions of partition values") {
+ val t = "testpart.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"""
+ |CREATE TABLE $t (
+ | part0 tinyint,
+ | part1 smallint,
+ | part2 int,
+ | part3 bigint,
+ | part4 float,
+ | part5 double,
+ | part6 string,
+ | part7 boolean,
+ | part8 date,
+ | part9 timestamp
+ |) USING foo
+ |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
+ |""".stripMargin)
+ val partTable = catalog("testpart").asTableCatalog
+ .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
+ .asPartitionable
+ val expectedPartition = InternalRow.fromSeq(Seq[Any](
+ -1, // tinyint
+ 0, // smallint
+ 1, // int
+ 2, // bigint
+ 3.14F, // float
+ 3.14D, // double
+ UTF8String.fromString("abc"), // string
+ true, // boolean
+ LocalDate.parse("2020-11-23").toEpochDay,
+ DateTimeUtils.instantToMicros(
+ LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant)
+ ))
+ assert(!partTable.partitionExists(expectedPartition))
+ val partSpec = """
+ | part0 = -1,
+ | part1 = 0,
+ | part2 = 1,
+ | part3 = 2,
+ | part4 = 3.14,
+ | part5 = 3.14,
+ | part6 = 'abc',
+ | part7 = true,
+ | part8 = '2020-11-23',
+ | part9 = '2020-11-23T22:13:10.123456'
+ |""".stripMargin
+ sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
+ assert(partTable.partitionExists(expectedPartition))
+ sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)")
+ assert(!partTable.partitionExists(expectedPartition))
+ }
+ }
+
+ test("SPARK-33650: add/drop partition into a table which doesn't support partition management") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _")
- val errMsg = intercept[AnalysisException] {
- spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
- }.getMessage
- assert(errMsg.contains(s"Table $t can not alter partitions"))
+ Seq(
+ s"ALTER TABLE $t ADD PARTITION (id=1)",
+ s"ALTER TABLE $t DROP PARTITION (id=1)"
+ ).foreach { alterTable =>
+ val errMsg = intercept[AnalysisException] {
+ spark.sql(alterTable)
+ }.getMessage
+ assert(errMsg.contains(s"Table $t can not alter partitions"))
+ }
}
}
@@ -146,11 +269,16 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
|CREATE TABLE $t (id bigint, part0 int, part1 string)
|USING foo
|PARTITIONED BY (part0, part1)""".stripMargin)
- val errMsg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)")
- }.getMessage
- assert(errMsg.contains("Partition spec is invalid. " +
- "The spec (part0) must match the partition spec (part0, part1)"))
+ Seq(
+ s"ALTER TABLE $t ADD PARTITION (part0 = 1)",
+ s"ALTER TABLE $t DROP PARTITION (part0 = 1)"
+ ).foreach { alterTable =>
+ val errMsg = intercept[AnalysisException] {
+ sql(alterTable)
+ }.getMessage
+ assert(errMsg.contains("Partition spec is invalid. " +
+ "The spec (part0) must match the partition spec (part0, part1)"))
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala
deleted file mode 100644
index 5ebca8f..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
-import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession {
- test("add partition if not exists") {
- val sql = """
- |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
- |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
- |(dt='2009-09-09', country='uk')""".stripMargin
- val parsed = parsePlan(sql)
- val expected = AlterTableAddPartition(
- UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
- Seq(
- UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
- UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
- ifNotExists = true)
- comparePlans(parsed, expected)
- }
-
- test("add partition") {
- val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"
- val parsed = parsePlan(sql)
- val expected = AlterTableAddPartition(
- UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
- Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
- ifNotExists = false)
-
- comparePlans(parsed, expected)
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
deleted file mode 100644
index 0cf0b39..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import org.scalactic.source.Position
-import org.scalatest.Tag
-
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
- protected def version: String
- protected def catalog: String
- protected def defaultUsing: String
-
- override def test(testName: String, testTags: Tag*)(testFun: => Any)
- (implicit pos: Position): Unit = {
- super.test(s"ALTER TABLE .. ADD PARTITION $version: " + testName, testTags: _*)(testFun)
- }
-
- protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = {
- val partitions = sql(s"SHOW PARTITIONS $t")
- .collect()
- .toSet
- .map((row: Row) => row.getString(0))
- .map(PartitioningUtils.parsePathFragment)
- assert(partitions === expected.toSet)
- }
- protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit
-
- protected def withNsTable(ns: String, tableName: String, cat: String = catalog)
- (f: String => Unit): Unit = {
- val nsCat = s"$cat.$ns"
- withNamespace(nsCat) {
- sql(s"CREATE NAMESPACE $nsCat")
- val t = s"$nsCat.$tableName"
- withTable(t) {
- f(t)
- }
- }
- }
-
- test("one partition") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- Seq("", "IF NOT EXISTS").foreach { exists =>
- sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'")
-
- checkPartitions(t, Map("id" -> "1"))
- checkLocation(t, Map("id" -> "1"), "loc")
- }
- }
- }
-
- test("multiple partitions") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- Seq("", "IF NOT EXISTS").foreach { exists =>
- sql(s"""
- |ALTER TABLE $t ADD $exists
- |PARTITION (id=1) LOCATION 'loc'
- |PARTITION (id=2) LOCATION 'loc1'""".stripMargin)
-
- checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
- checkLocation(t, Map("id" -> "1"), "loc")
- checkLocation(t, Map("id" -> "2"), "loc1")
- }
- }
- }
-
- test("multi-part partition") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)")
- Seq("", "IF NOT EXISTS").foreach { exists =>
- sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')")
-
- checkPartitions(t, Map("a" -> "2", "b" -> "abc"))
- }
- }
- }
-
- test("table to alter does not exist") {
- withNsTable("ns", "does_not_exist") { t =>
- val errMsg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')")
- }.getMessage
- assert(errMsg.contains("Table not found"))
- }
- }
-
- test("case sensitivity in resolving partition specs") {
- withNsTable("ns", "tbl") { t =>
- spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val errMsg = intercept[AnalysisException] {
- spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
- }.getMessage
- assert(errMsg.contains("ID is not a valid partition column"))
- }
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
- spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
- checkPartitions(t, Map("id" -> "1"))
- checkLocation(t, Map("id" -> "1"), "loc1")
- }
- }
- }
-
- test("SPARK-33521: universal type conversions of partition values") {
- withNsTable("ns", "tbl") { t =>
- sql(s"""
- |CREATE TABLE $t (
- | id int,
- | part0 tinyint,
- | part1 smallint,
- | part2 int,
- | part3 bigint,
- | part4 float,
- | part5 double,
- | part6 string,
- | part7 boolean,
- | part8 date,
- | part9 timestamp
- |) $defaultUsing
- |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
- |""".stripMargin)
- val partSpec = """
- | part0 = -1,
- | part1 = 0,
- | part2 = 1,
- | part3 = 2,
- | part4 = 3.14,
- | part5 = 3.14,
- | part6 = 'abc',
- | part7 = true,
- | part8 = '2020-11-23',
- | part9 = '2020-11-23 22:13:10.123456'
- |""".stripMargin
- sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
- val expected = Map(
- "part0" -> "-1",
- "part1" -> "0",
- "part2" -> "1",
- "part3" -> "2",
- "part4" -> "3.14",
- "part5" -> "3.14",
- "part6" -> "abc",
- "part7" -> "true",
- "part8" -> "2020-11-23",
- "part9" -> "2020-11-23 22:13:10.123456")
- checkPartitions(t, expected)
- sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)")
- checkPartitions(t) // no partitions
- }
- }
-
- test("SPARK-33676: not fully specified partition spec") {
- withNsTable("ns", "tbl") { t =>
- sql(s"""
- |CREATE TABLE $t (id bigint, part0 int, part1 string)
- |$defaultUsing
- |PARTITIONED BY (part0, part1)""".stripMargin)
- val errMsg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $t ADD PARTITION (part0 = 1)")
- }.getMessage
- assert(errMsg.contains("Partition spec is invalid. " +
- "The spec (part0) must match the partition spec (part0, part1)"))
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index f6268ef..4f79e71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -334,6 +334,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
testChangeColumn(isDatasourceTable = true)
}
+ test("alter table: add partition (datasource table)") {
+ testAddPartitions(isDatasourceTable = true)
+ }
+
test("alter table: drop partition (datasource table)") {
testDropPartitions(isDatasourceTable = true)
}
@@ -1618,6 +1622,63 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
+ protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+ if (!isUsingHiveMetastore) {
+ assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+ }
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val part3 = Map("a" -> "3", "b" -> "7")
+ val part4 = Map("a" -> "4", "b" -> "8")
+ val part5 = Map("a" -> "9", "b" -> "9")
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent, isDatasourceTable)
+ createTablePartition(catalog, part1, tableIdent)
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+ // basic add partition
+ sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+ "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+ assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
+
+ val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
+ assert(tableLocation.isDefined)
+ val partitionLocation = makeQualifiedPath(
+ new Path(tableLocation.get.toString, "paris").toString)
+
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
+ assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
+
+ // add partitions without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
+
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
+ }
+
+ // partition to add already exists
+ intercept[AnalysisException] {
+ sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
+ }
+
+ // partition to add already exists when using IF NOT EXISTS
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4))
+
+ // partition spec in ADD PARTITION should be case insensitive by default
+ sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+ Set(part1, part2, part3, part4, part5))
+ }
+
protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
if (!isUsingHiveMetastore) {
assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index 295ce1d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.v1
-
-import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.execution.command
-import org.apache.spark.sql.test.SharedSparkSession
-
-trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase {
- override def version: String = "V1"
- override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
- override def defaultUsing: String = "USING parquet"
-
- override protected def checkLocation(
- t: String,
- spec: TablePartitionSpec,
- expected: String): Unit = {
- val tablePath = t.split('.')
- val tableName = tablePath.last
- val ns = tablePath.init.mkString(".")
- val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ")
- val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)")
- .select("information")
- .first().getString(0)
- val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head
- assert(location.endsWith(expected))
- }
-}
-
-class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession {
- test("partition already exists") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
- val errMsg = intercept[PartitionsAlreadyExistException] {
- sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- }.getMessage
- assert(errMsg.contains("The following partitions already exists"))
-
- sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index b15235d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.v2
-
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
-import org.apache.spark.sql.execution.command
-import org.apache.spark.sql.test.SharedSparkSession
-
-class AlterTableAddPartitionSuite
- extends command.AlterTableAddPartitionSuiteBase
- with SharedSparkSession {
-
- import CatalogV2Implicits._
-
- override def version: String = "V2"
- override def catalog: String = "test_catalog"
- override def defaultUsing: String = "USING _"
-
- override def sparkConf: SparkConf = super.sparkConf
- .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
- .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
-
- override protected def checkLocation(
- t: String,
- spec: TablePartitionSpec,
- expected: String): Unit = {
- val tablePath = t.split('.')
- val catalogName = tablePath.head
- val namespaceWithTable = tablePath.tail
- val namespaces = namespaceWithTable.init
- val tableName = namespaceWithTable.last
- val catalogPlugin = spark.sessionState.catalogManager.catalog(catalogName)
- val partTable = catalogPlugin.asTableCatalog
- .loadTable(Identifier.of(namespaces, tableName))
- .asInstanceOf[InMemoryPartitionTable]
- val ident = ResolvePartitionSpec.convertToPartIdent(spec, partTable.partitionSchema.fields)
- val partMetadata = partTable.loadPartitionMetadata(ident)
-
- assert(partMetadata.containsKey("location"))
- assert(partMetadata.get("location") === expected)
- }
-
- test("partition already exists") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
- val errMsg = intercept[PartitionsAlreadyExistException] {
- sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- }.getMessage
- assert(errMsg.contains("The following partitions already exists"))
-
- sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
- }
- }
-
- test("SPARK-33650: add partition into a table which doesn't support partition management") {
- withNsTable("ns", "tbl", s"non_part_$catalog") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
- val errMsg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
- }.getMessage
- assert(errMsg.contains(s"Table $t can not alter partitions"))
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7302ee9..ce31e39 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -166,6 +166,10 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
testDropPartitions(isDatasourceTable = false)
}
+ test("alter table: add partition") {
+ testAddPartitions(isDatasourceTable = false)
+ }
+
test("drop table") {
testDropTable(isDatasourceTable = false)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala
deleted file mode 100644
index ef0ec8d..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution.command
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.execution.command.v1
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-
-class AlterTableAddPartitionSuite
- extends v1.AlterTableAddPartitionSuiteBase
- with TestHiveSingleton {
- override def version: String = "Hive V1"
- override def defaultUsing: String = "USING HIVE"
-
- test("partition already exists") {
- withNsTable("ns", "tbl") { t =>
- sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
- sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
- val errMsg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- }.getMessage
- assert(errMsg.contains("already exists"))
-
- sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
- " PARTITION (id=2) LOCATION 'loc1'")
- checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org