You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/11/10 12:22:25 UTC

[spark] branch master updated: [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a1267a  [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2
2a1267a is described below

commit 2a1267aeb75bf838c74d1cf274aa258be060c17b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Nov 10 15:21:33 2021 +0300

    [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to skip checking of ANSI interval types while creating or writing to a table using V2 catalogs. As the consequence of that, users can creating tables in V2 catalogs partitioned by ANSI interval columns (the legacy intervals of `CalendarIntervalType` are still prohibited). Also this PR adds new test which checks:
    1. Adding new partition with ANSI intervals via `ALTER TABLE .. ADD PARTITION`
    2. INSERT INTO a table partitioned by ANSI intervals
    
    for V1/V2 In-Memory catalogs (skips V1 Hive external catalog).
    
    ### Why are the changes needed?
    To allow users saving of ANSI intervals as partition values using DSv2.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running new test for V1/V2 In-Memory and V1 Hive external catalogs:
    ```
    $ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v1.AlterTableAddPartitionSuite"
    $ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v2.AlterTableAddPartitionSuite"
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite"
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite"
    ```
    
    Closes #34537 from MaxGekk/alter-table-ansi-interval.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  6 ++--
 .../apache/spark/sql/catalyst/util/TypeUtils.scala |  4 +--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 16 +++++----
 .../command/AlterTableAddPartitionSuiteBase.scala  | 40 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 5bf37a2..1a105ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -464,10 +464,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}")
             }
 
-            create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
+            create.tableSchema.foreach(f =>
+              TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false))
 
           case write: V2WriteCommand if write.resolved =>
-            write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
+            write.query.schema.foreach(f =>
+              TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false))
 
           case alter: AlterTableCommand =>
             checkAlterTableCommand(alter)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index cba3a9a..144508c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -98,8 +98,8 @@ object TypeUtils {
     case _ => false
   }
 
-  def failWithIntervalType(dataType: DataType): Unit = {
-    invokeOnceForInterval(dataType, forbidAnsiIntervals = true) {
+  def failWithIntervalType(dataType: DataType, forbidAnsiIntervals: Boolean = true): Unit = {
+    invokeOnceForInterval(dataType, forbidAnsiIntervals) {
       throw QueryCompilationErrors.cannotUseIntervalTypeInTableSchemaError()
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index f03792f..499638c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -340,13 +340,15 @@ class DataSourceV2SQLSuite
   }
 
   test("CTAS/RTAS: invalid schema if has interval type") {
-    Seq("CREATE", "REPLACE").foreach { action =>
-      val e1 = intercept[AnalysisException](
-        sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
-      assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
-      val e2 = intercept[AnalysisException](
-        sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
-      assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+    withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
+      Seq("CREATE", "REPLACE").foreach { action =>
+        val e1 = intercept[AnalysisException](
+          sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
+        assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
+        val e2 = intercept[AnalysisException](
+          sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
+        assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+      }
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
index e2e1591..dee1495 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import java.time.{Duration, Period}
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.internal.SQLConf
 
@@ -189,4 +191,40 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils
       checkPartitions(t, Map("part" ->"2020-01-01"))
     }
   }
+
+  test("SPARK-37261: Add ANSI intervals as partition values") {
+    assume(!catalogVersion.contains("Hive")) // Hive catalog doesn't support the interval types
+
+    withNamespaceAndTable("ns", "tbl") { t =>
+      sql(
+        s"""CREATE TABLE $t (
+           | ym INTERVAL YEAR,
+           | dt INTERVAL DAY,
+           | data STRING) $defaultUsing
+           |PARTITIONED BY (ym, dt)""".stripMargin)
+      sql(
+        s"""ALTER TABLE $t ADD PARTITION (
+           | ym = INTERVAL '100' YEAR,
+           | dt = INTERVAL '10' DAY
+           |) LOCATION 'loc'""".stripMargin)
+
+      checkPartitions(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY"))
+      checkLocation(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY"), "loc")
+
+      sql(
+        s"""INSERT INTO $t PARTITION (
+           | ym = INTERVAL '100' YEAR,
+           | dt = INTERVAL '10' DAY) SELECT 'aaa'""".stripMargin)
+      sql(
+        s"""INSERT INTO $t PARTITION (
+           | ym = INTERVAL '1' YEAR,
+           | dt = INTERVAL '-1' DAY) SELECT 'bbb'""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT ym, dt, data FROM $t"),
+        Seq(
+          Row(Period.ofYears(100), Duration.ofDays(10), "aaa"),
+          Row(Period.ofYears(1), Duration.ofDays(-1), "bbb")))
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org