You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/08/03 17:31:44 UTC
[spark] branch branch-3.2 updated: [SPARK-36349][SQL] Disallow ANSI
intervals in file-based datasources
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new bd33408 [SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
bd33408 is described below
commit bd33408b4b5aefc5b83ab1355bb0c1faacad190c
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Aug 3 20:30:20 2021 +0300
[SPARK-36349][SQL] Disallow ANSI intervals in file-based datasources
### What changes were proposed in this pull request?
In the PR, I propose to ban `YearMonthIntervalType` and `DayTimeIntervalType` at the analysis phase while creating a table using a built-in filed-based datasource or writing a dataset to such datasource. In particular, add the following case:
```scala
case _: DayTimeIntervalType | _: YearMonthIntervalType => false
```
to all methods that override either:
- V2 `FileTable.supportsDataType()`
- V1 `FileFormat.supportDataType()`
### Why are the changes needed?
To improve user experience with Spark SQL, and output a proper error message at the analysis phase.
### Does this PR introduce _any_ user-facing change?
Yes but ANSI interval types haven't released yet. So, for users this is new behavior.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
```
Closes #33580 from MaxGekk/interval-ban-in-ds.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
(cherry picked from commit 67cbc932638179925ebbeb76d6d6e6f25a3cb2e2)
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../org/apache/spark/sql/avro/AvroUtils.scala | 2 ++
.../execution/datasources/csv/CSVFileFormat.scala | 2 ++
.../datasources/json/JsonFileFormat.scala | 2 ++
.../execution/datasources/orc/OrcFileFormat.scala | 2 ++
.../datasources/parquet/ParquetFileFormat.scala | 2 ++
.../execution/datasources/v2/csv/CSVTable.scala | 4 +++-
.../execution/datasources/v2/json/JsonTable.scala | 2 ++
.../execution/datasources/v2/orc/OrcTable.scala | 2 ++
.../datasources/v2/parquet/ParquetTable.scala | 2 ++
.../datasources/CommonFileDataSourceSuite.scala | 25 +++++++++++++++++++++-
.../apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 ++
11 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index f09af74..68b393e 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -71,6 +71,8 @@ private[sql] object AvroUtils extends Logging {
}
def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index d40ad9d..c3a8a95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -148,6 +148,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
override def supportDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 9c6c77a..7ffeba4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -134,6 +134,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
override def supportDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 85c0ff0..108b216 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -247,6 +247,8 @@ class OrcFileFormat
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 586952a..e2fe5b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -373,6 +373,8 @@ class ParquetFileFormat
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 839cd01..e6299be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuild
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
-import org.apache.spark.sql.types.{AtomicType, DataType, StructType, UserDefinedType}
+import org.apache.spark.sql.types.{AtomicType, DataType, DayTimeIntervalType, StructType, UserDefinedType, YearMonthIntervalType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class CSVTable(
@@ -55,6 +55,8 @@ case class CSVTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
index 5216800..38277fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -55,6 +55,8 @@ case class JsonTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index 9cc4525..bffc091 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -49,6 +49,8 @@ case class OrcTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index c8bb4b2..78797f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -49,6 +49,8 @@ case class ParquetTable(
}
override def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
index b7d0a7f..e59bc05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.datasources
+import java.util.Locale
+
import org.scalatest.funsuite.AnyFunSuite
-import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper
/**
@@ -33,6 +35,27 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>
protected def dataSourceFormat: String
protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)
+ test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") {
+ Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i =>
+ withTempPath { dir =>
+ val errMsg = intercept[AnalysisException] {
+ spark.sql(s"SELECT $i").write.format(dataSourceFormat).save(dir.getAbsolutePath)
+ }.getMessage
+ assert(errMsg.contains("Cannot save interval data type into external storage"))
+ }
+ }
+
+ // Check all built-in file-based datasources except of libsvm which requires particular schema.
+ if (!Set("libsvm").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
+ Seq("INTERVAL DAY TO SECOND", "INTERVAL YEAR TO MONTH").foreach { it =>
+ val errMsg = intercept[AnalysisException] {
+ spark.sql(s"CREATE TABLE t (i $it) USING $dataSourceFormat")
+ }.getMessage
+ assert(errMsg.contains("data source does not support"))
+ }
+ }
+ }
+
test(s"Propagate Hadoop configs from $dataSourceFormat options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index c50ecf7..2ca1eb3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -194,6 +194,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
+ case _: DayTimeIntervalType | _: YearMonthIntervalType => false
+
case _: AtomicType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org