You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/08 15:16:30 UTC

[spark] branch master updated: [SPARK-37225][SQL] Support reading and writing ANSI intervals from/to Avro datasources

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 562f43c  [SPARK-37225][SQL] Support reading and writing ANSI intervals from/to Avro datasources
562f43c is described below

commit 562f43c945315aaa0d0979ec1a16bfb5616dd053
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Nov 8 23:15:38 2021 +0800

    [SPARK-37225][SQL] Support reading and writing ANSI intervals from/to Avro datasources
    
    ### What changes were proposed in this pull request?
    Allow saving and loading of ANSI intervals - `YearMonthIntervalType` and `DayTimeIntervalType` to/from the **Avro** datasource. After the changes, Spark saves ANSI intervals as primitive physical Avro types:
    - year-month intervals as `int`
    - day-time intervals as `long`
    
    w/o any modifications. To load the values as intervals back, Spark puts the info about interval types to the extra property `spark.sql.catalyst.type`:
    ```
    $ java -jar avro-tools-1.9.2.jar getmeta part-00000-28176110-d0dc-4de7-81b8-31797c15c678-c000.avro
    avro.schema	{"type":"record","name":"topLevelRecord","fields":[{"name":"i","type":{"type":"int","spark.sql.catalyst.type":"interval year to month"}}]}
    org.apache.spark.version	3.3.0
    avro.codec	snappy
    ```
    
    **Note:** The given PR focus on support of ANSI intervals in the Avro datasource via write or read as a column in `Dataset`.
    
    ### Why are the changes needed?
    To improve user experience with Spark SQL. At the moment, users can make ANSI intervals "inside" Spark, parallelize Java collections of `Period`/`Duration` objects or save/load to/from JSON, CSV, ORC, Parquet datasources but cannot save the intervals to **Avro** files. After the changes, users can save datasets/dataframes with year-month/day-time intervals to load them back later by Apache Spark.
    
    For example:
    ```scala
    scala> sql("select date'today' - date'2021-01-01' as diff").write.format("avro").save("/Users/maximgekk/tmp/avro_interval")
    
    scala> val readback = spark.read.format("avro").load("/Users/maximgekk/tmp/avro_interval")
    readback: org.apache.spark.sql.DataFrame = [diff: interval day]
    
    scala> readback.printSchema
    root
     |-- diff: interval day (nullable = true)
    
    scala> readback.show
    +------------------+
    |              diff|
    +------------------+
    |INTERVAL '309' DAY|
    +------------------+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    In some sense, yes. Before the changes, users get an error while saving of ANSI intervals as dataframe columns to avro files but the operation should complete successfully after the changes.
    
    ### How was this patch tested?
    By running the new test:
    ```
    $ build/sbt "test:testOnly *AvroV1Suite"
    $ build/sbt "test:testOnly *AvroV2Suite"
    $ build/sbt "test:testOnly *FileBasedDataSourceSuite"
    $ build/sbt -Phive-2.3 "test:testOnly *HiveOrcSourceSuite"
    ```
    
    Closes #34503 from MaxGekk/avro-ansi-intervals-2.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  6 +++
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  6 +++
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  2 -
 .../apache/spark/sql/avro/SchemaConverters.scala   | 31 +++++++++++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 57 +++++++++++++++++-----
 .../sql/execution/datasources/DataSource.scala     |  8 +--
 .../datasources/CommonFileDataSourceSuite.scala    | 28 +----------
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala    |  2 +-
 8 files changed, 88 insertions(+), 52 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9a27215..d7f2fa8 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -330,6 +330,12 @@ private[sql] class AvroDeserializer(
           (updater, ordinal, _) => updater.setNullAt(ordinal)
         }
 
+      case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, value.asInstanceOf[Int])
+
+      case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) =>
+        updater.setLong(ordinal, value.asInstanceOf[Long])
+
       case _ => throw new IncompatibleSchemaException(incompatibleMsg)
     }
   }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index e9ad709..32a84d0 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -240,6 +240,12 @@ private[sql] class AvroSerializer(
           }
           result
 
+      case (_: YearMonthIntervalType, INT) =>
+        (getter, ordinal) => getter.getInt(ordinal)
+
+      case (_: DayTimeIntervalType, LONG) =>
+        (getter, ordinal) => getter.getLong(ordinal)
+
       case _ =>
         throw new IncompatibleSchemaException(errorPrefix +
           s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = $avroType)")
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index f830bbc..149d0b6 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -71,8 +71,6 @@ private[sql] object AvroUtils extends Logging {
   }
 
   def supportsDataType(dataType: DataType): Boolean = dataType match {
-    case _: AnsiIntervalType => false
-
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 347364c..aea2d99 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -24,6 +24,7 @@ import org.apache.avro.LogicalTypes.{Date, Decimal, LocalTimestampMicros, LocalT
 import org.apache.avro.Schema.Type._
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.Decimal.minBytesForPrecision
 
@@ -51,11 +52,21 @@ object SchemaConverters {
     toSqlTypeHelper(avroSchema, Set.empty)
   }
 
+  // The property specifies Catalyst type of the given field
+  private val CATALYST_TYPE_PROP_NAME = "spark.sql.catalyst.type"
+
   private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
     avroSchema.getType match {
       case INT => avroSchema.getLogicalType match {
         case _: Date => SchemaType(DateType, nullable = false)
-        case _ => SchemaType(IntegerType, nullable = false)
+        case _ =>
+          val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
+          val catalystType = if (catalystTypeAttrValue == null) {
+            IntegerType
+          } else {
+            CatalystSqlParser.parseDataType(catalystTypeAttrValue)
+          }
+          SchemaType(catalystType, nullable = false)
       }
       case STRING => SchemaType(StringType, nullable = false)
       case BOOLEAN => SchemaType(BooleanType, nullable = false)
@@ -72,7 +83,14 @@ object SchemaConverters {
         case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
         case _: LocalTimestampMillis | _: LocalTimestampMicros =>
           SchemaType(TimestampNTZType, nullable = false)
-        case _ => SchemaType(LongType, nullable = false)
+        case _ =>
+          val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
+          val catalystType = if (catalystTypeAttrValue == null) {
+            LongType
+          } else {
+            CatalystSqlParser.parseDataType(catalystTypeAttrValue)
+          }
+          SchemaType(catalystType, nullable = false)
       }
 
       case ENUM => SchemaType(StringType, nullable = false)
@@ -195,6 +213,15 @@ object SchemaConverters {
         }
         fieldsAssembler.endRecord()
 
+      case ym: YearMonthIntervalType =>
+        val ymIntervalType = builder.intType()
+        ymIntervalType.addProp(CATALYST_TYPE_PROP_NAME, ym.typeName)
+        ymIntervalType
+      case dt: DayTimeIntervalType =>
+        val dtIntervalType = builder.longType()
+        dtIntervalType.addProp(CATALYST_TYPE_PROP_NAME, dt.typeName)
+        dtIntervalType
+
       // This should never happen.
       case other => throw new IncompatibleSchemaException(s"Unexpected type $other.")
     }
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 600386b..b045d17 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1172,20 +1172,22 @@ abstract class AvroSuite
   }
 
   test("error handling for unsupported Interval data types") {
-    withTempDir { dir =>
-      val tempDir = new File(dir, "files").getCanonicalPath
-      var msg = intercept[AnalysisException] {
-        sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
-      }.getMessage
-      assert(msg.contains("Cannot save interval data type into external storage.") ||
-        msg.contains("AVRO data source does not support interval data type."))
+    withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
+      withTempDir { dir =>
+        val tempDir = new File(dir, "files").getCanonicalPath
+        var msg = intercept[AnalysisException] {
+          sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.contains("Cannot save interval data type into external storage.") ||
+          msg.contains("AVRO data source does not support interval data type."))
 
-      msg = intercept[AnalysisException] {
-        spark.udf.register("testType", () => new IntervalData())
-        sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
-      }.getMessage
-      assert(msg.toLowerCase(Locale.ROOT)
-        .contains(s"avro data source does not support interval data type."))
+        msg = intercept[AnalysisException] {
+          spark.udf.register("testType", () => new IntervalData())
+          sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
+        }.getMessage
+        assert(msg.toLowerCase(Locale.ROOT)
+          .contains(s"avro data source does not support interval data type."))
+      }
     }
   }
 
@@ -2188,6 +2190,35 @@ abstract class AvroSuite
       }
     }
   }
+
+  test("SPARK-37225: Support reading and writing ANSI intervals") {
+    Seq(
+      YearMonthIntervalType() -> ((i: Int) => java.time.Period.of(i, i, 0)),
+      DayTimeIntervalType() -> ((i: Int) => java.time.Duration.ofDays(i).plusSeconds(i))
+    ).foreach { case (it, f) =>
+      val data = (1 to 10).map(i => Row(i, f(i)))
+      val schema = StructType(Array(StructField("d", IntegerType, false),
+        StructField("i", it, false)))
+      withTempPath { file =>
+        val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
+        df.write.format("avro").save(file.getCanonicalPath)
+        val df2 = spark.read.format("avro").load(file.getCanonicalPath)
+        checkAnswer(df2, df.collect().toSeq)
+      }
+    }
+
+    // Tests for ANSI intervals in complex types.
+    withTempPath { file =>
+      val df = spark.sql(
+        """SELECT
+          |  named_struct('interval', interval '1-2' year to month) a,
+          |  array(interval '1 2:3' day to minute) b,
+          |  map('key', interval '10' year) c""".stripMargin)
+      df.write.format("avro").save(file.getCanonicalPath)
+      val df2 = spark.read.format("avro").load(file.getCanonicalPath)
+      checkAnswer(df2, df.collect().toSeq)
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9936126..80ab07b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -579,17 +579,11 @@ case class DataSource(
       checkEmptyGlobPath, checkFilesExist, enableGlobbing = globPaths)
   }
 
-  // TODO: Remove the Set below once all the built-in datasources support ANSI interval types
-  private val writeAllowedSources: Set[Class[_]] =
-    Set(classOf[ParquetFileFormat], classOf[CSVFileFormat],
-      classOf[JsonFileFormat], classOf[OrcFileFormat])
-
   private def disallowWritingIntervals(
       dataTypes: Seq[DataType],
       forbidAnsiIntervals: Boolean): Unit = {
-    val isWriteAllowedSource = writeAllowedSources(providingClass)
     dataTypes.foreach(
-      TypeUtils.invokeOnceForInterval(_, forbidAnsiIntervals || !isWriteAllowedSource) {
+      TypeUtils.invokeOnceForInterval(_, forbidAnsiIntervals) {
       throw QueryCompilationErrors.cannotSaveIntervalIntoExternalStorageError()
     })
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
index 854463d3..b7d0a7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.Locale
-
 import org.scalatest.funsuite.AnyFunSuite
 
-import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
+import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 
 /**
@@ -35,30 +33,6 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>
   protected def dataSourceFormat: String
   protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)
 
-  test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") {
-    if (!Set("parquet", "csv", "json", "orc").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
-      Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i =>
-        withTempPath { dir =>
-          val errMsg = intercept[AnalysisException] {
-            spark.sql(s"SELECT $i").write.format(dataSourceFormat).save(dir.getAbsolutePath)
-          }.getMessage
-          assert(errMsg.contains("Cannot save interval data type into external storage"))
-        }
-      }
-
-      // Check all built-in file-based datasources except of libsvm which
-      // requires particular schema.
-      if (!Set("libsvm").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
-        Seq("INTERVAL DAY TO SECOND", "INTERVAL YEAR TO MONTH").foreach { it =>
-          val errMsg = intercept[AnalysisException] {
-            spark.sql(s"CREATE TABLE t (i $it) USING $dataSourceFormat")
-          }.getMessage
-          assert(errMsg.contains("data source does not support"))
-        }
-      }
-    }
-  }
-
   test(s"Propagate Hadoop configs from $dataSourceFormat options to underlying file system") {
     withSQLConf(
       "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index a66c337..574281a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -116,7 +116,7 @@ class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
       var msg = intercept[AnalysisException] {
         sql("select interval 1 days").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("Cannot save interval data type into external storage."))
+      assert(msg.contains("ORC data source does not support interval day data type"))
 
       msg = intercept[AnalysisException] {
         sql("select null").write.mode("overwrite").orc(orcDir)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org