You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/08/03 17:31:44 UTC

[spark] branch branch-3.2 updated: [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new bd33408  [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
bd33408 is described below

commit bd33408b4b5aefc5b83ab1355bb0c1faacad190c
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Aug 3 20:30:20 2021 +0300

    [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
    ```scala
    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
    ```
    to all methods that override either:
    - V2 `FileTable.supportsDataType()`
    - V1 `FileFormat.supportDataType()`
    
    ### Why are the changes needed?
    To improve user experience with Spark SQL, and output a proper error message at the analysis phase.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes but ANSI interval types haven't released yet. So, for users this is new behavior.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
    ```
    
    Closes #33580 from MaxGekk/interval-ban-in-ds.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
    (cherry picked from commit 67cbc932638179925ebbeb76d6d6e6f25a3cb2e2)
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  2 ++
 .../execution/datasources/csv/CSVFileFormat.scala  |  2 ++
 .../datasources/json/JsonFileFormat.scala          |  2 ++
 .../execution/datasources/orc/OrcFileFormat.scala  |  2 ++
 .../datasources/parquet/ParquetFileFormat.scala    |  2 ++
 .../execution/datasources/v2/csv/CSVTable.scala    |  4 +++-
 .../execution/datasources/v2/json/JsonTable.scala  |  2 ++
 .../execution/datasources/v2/orc/OrcTable.scala    |  2 ++
 .../datasources/v2/parquet/ParquetTable.scala      |  2 ++
 .../datasources/CommonFileDataSourceSuite.scala    | 25 +++++++++++++++++++++-
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala  |  2 ++
 11 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index f09af74..68b393e 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -71,6 +71,8 @@ private[sql] object AvroUtils extends Logging {
   }
 
   def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index d40ad9d..c3a8a95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -148,6 +148,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
   override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 9c6c77a..7ffeba4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -134,6 +134,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
   override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 85c0ff0..108b216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -247,6 +247,8 @@ class OrcFileFormat
   }
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 586952a..e2fe5b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -373,6 +373,8 @@ class ParquetFileFormat
   }
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 839cd01..e6299be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuild
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
 import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType}
+import org.apache.spark.sql.types.{AtomicType, DataType, DayTimeIntervalType, StructType, UserDefinedType, YearMonthIntervalType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class CSVTable(
@@ -55,6 +55,8 @@ case class CSVTable(
     }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index 5216800..38277fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -55,6 +55,8 @@ case class JsonTable(
     }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 9cc4525..bffc091 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -49,6 +49,8 @@ case class OrcTable(
     }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index c8bb4b2..78797f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -49,6 +49,8 @@ case class ParquetTable(
     }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
index b7d0a7f..e59bc05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
+
 import org.scalatest.funsuite.AnyFunSuite
 
-import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 
 /**
@@ -33,6 +35,27 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>
   protected def dataSourceFormat: String
   protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)
 
+  test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") {
+    Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i =>
+      withTempPath { dir =>
+        val errMsg = intercept[AnalysisException] {
+          spark.sql(s"SELECT $i").write.format(dataSourceFormat).save(dir.getAbsolutePath)
+        }.getMessage
+        assert(errMsg.contains("Cannot save interval data type into external storage"))
+      }
+    }
+
+    // Check all built-in file-based datasources except of libsvm which requires particular schema.
+    if (!Set("libsvm").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
+      Seq("INTERVAL DAY TO SECOND", "INTERVAL YEAR TO MONTH").foreach { it =>
+        val errMsg = intercept[AnalysisException] {
+          spark.sql(s"CREATE TABLE t (i $it) USING $dataSourceFormat")
+        }.getMessage
+        assert(errMsg.contains("data source does not support"))
+      }
+    }
+  }
+
   test(s"Propagate Hadoop configs from $dataSourceFormat options to underlying file system") {
     withSQLConf(
       "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index c50ecf7..2ca1eb3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -194,6 +194,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
   }
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org