You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/11/10 12:22:25 UTC
[spark] branch master updated: [SPARK-37261][SQL] Allow adding
partitions with ANSI intervals in DSv2
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2a1267a [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2
2a1267a is described below
commit 2a1267aeb75bf838c74d1cf274aa258be060c17b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Nov 10 15:21:33 2021 +0300
[SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2
### What changes were proposed in this pull request?
In the PR, I propose to skip checking of ANSI interval types while creating or writing to a table using V2 catalogs. As the consequence of that, users can creating tables in V2 catalogs partitioned by ANSI interval columns (the legacy intervals of `CalendarIntervalType` are still prohibited). Also this PR adds new test which checks:
1. Adding new partition with ANSI intervals via `ALTER TABLE .. ADD PARTITION`
2. INSERT INTO a table partitioned by ANSI intervals
for V1/V2 In-Memory catalogs (skips V1 Hive external catalog).
### Why are the changes needed?
To allow users saving of ANSI intervals as partition values using DSv2.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running new test for V1/V2 In-Memory and V1 Hive external catalogs:
```
$ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v1.AlterTableAddPartitionSuite"
$ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v2.AlterTableAddPartitionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
```
Closes #34537 from MaxGekk/alter-table-ansi-interval.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../sql/catalyst/analysis/CheckAnalysis.scala | 6 ++--
.../apache/spark/sql/catalyst/util/TypeUtils.scala | 4 +--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 16 +++++----
.../command/AlterTableAddPartitionSuiteBase.scala | 40 +++++++++++++++++++++-
4 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 5bf37a2..1a105ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -464,10 +464,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}")
}
- create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
+ create.tableSchema.foreach(f =>
+ TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false))
case write: V2WriteCommand if write.resolved =>
- write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
+ write.query.schema.foreach(f =>
+ TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false))
case alter: AlterTableCommand =>
checkAlterTableCommand(alter)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index cba3a9a..144508c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -98,8 +98,8 @@ object TypeUtils {
case _ => false
}
- def failWithIntervalType(dataType: DataType): Unit = {
- invokeOnceForInterval(dataType, forbidAnsiIntervals = true) {
+ def failWithIntervalType(dataType: DataType, forbidAnsiIntervals: Boolean = true): Unit = {
+ invokeOnceForInterval(dataType, forbidAnsiIntervals) {
throw QueryCompilationErrors.cannotUseIntervalTypeInTableSchemaError()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index f03792f..499638c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -340,13 +340,15 @@ class DataSourceV2SQLSuite
}
test("CTAS/RTAS: invalid schema if has interval type") {
- Seq("CREATE", "REPLACE").foreach { action =>
- val e1 = intercept[AnalysisException](
- sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
- assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
- val e2 = intercept[AnalysisException](
- sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
- assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+ withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
+ Seq("CREATE", "REPLACE").foreach { action =>
+ val e1 = intercept[AnalysisException](
+ sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
+ assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
+ val e2 = intercept[AnalysisException](
+ sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
+ assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
index e2e1591..dee1495 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import java.time.{Duration, Period}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.internal.SQLConf
@@ -189,4 +191,40 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils
checkPartitions(t, Map("part" ->"2020-01-01"))
}
}
+
+ test("SPARK-37261: Add ANSI intervals as partition values") {
+ assume(!catalogVersion.contains("Hive")) // Hive catalog doesn't support the interval types
+
+ withNamespaceAndTable("ns", "tbl") { t =>
+ sql(
+ s"""CREATE TABLE $t (
+ | ym INTERVAL YEAR,
+ | dt INTERVAL DAY,
+ | data STRING) $defaultUsing
+ |PARTITIONED BY (ym, dt)""".stripMargin)
+ sql(
+ s"""ALTER TABLE $t ADD PARTITION (
+ | ym = INTERVAL '100' YEAR,
+ | dt = INTERVAL '10' DAY
+ |) LOCATION 'loc'""".stripMargin)
+
+ checkPartitions(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY"))
+ checkLocation(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY"), "loc")
+
+ sql(
+ s"""INSERT INTO $t PARTITION (
+ | ym = INTERVAL '100' YEAR,
+ | dt = INTERVAL '10' DAY) SELECT 'aaa'""".stripMargin)
+ sql(
+ s"""INSERT INTO $t PARTITION (
+ | ym = INTERVAL '1' YEAR,
+ | dt = INTERVAL '-1' DAY) SELECT 'bbb'""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT ym, dt, data FROM $t"),
+ Seq(
+ Row(Period.ofYears(100), Duration.ofDays(10), "aaa"),
+ Row(Period.ofYears(1), Duration.ofDays(-1), "bbb")))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org