You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/24 14:46:45 UTC

[spark] branch branch-3.3 updated: [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5582f92  [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64
5582f92 is described below

commit 5582f92046a3486dc6d30e6e4083446fdbd52667
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Mar 24 22:43:32 2022 +0800

    [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64
    
    ### What changes were proposed in this pull request?
    #33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion https://github.com/apache/spark/pull/34741#issuecomment-983660633, we just to fix read/write timestamp ntz to Orc uses int64.
    
    ### Why are the changes needed?
    Fix the bug about read/write timestamp ntz from/to Orc with different times zone.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Orc timestamp ntz is a new feature.
    
    ### How was this patch tested?
    New tests.
    
    Closes #34984 from beliefer/SPARK-37463-int64.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit e410d98f57750080ad46932cc9211d2cf5154c24)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/orc/OrcAtomicColumnVector.java     | 10 -----
 .../datasources/orc/OrcDeserializer.scala          |  5 +--
 .../execution/datasources/orc/OrcFileFormat.scala  |  9 ++---
 .../sql/execution/datasources/orc/OrcFilters.scala | 11 ++----
 .../execution/datasources/orc/OrcSerializer.scala  |  4 +-
 .../sql/execution/datasources/orc/OrcUtils.scala   | 46 ++++++----------------
 .../datasources/parquet/ParquetRowConverter.scala  |  4 ++
 .../v2/orc/OrcPartitionReaderFactory.scala         | 16 +++-----
 .../execution/datasources/v2/orc/OrcWrite.scala    |  2 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 26 ++++++++++++
 10 files changed, 59 insertions(+), 74 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
index b4f7b99..c2d8334 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
@@ -27,7 +27,6 @@ import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
-import org.apache.spark.sql.types.TimestampNTZType;
 import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -37,7 +36,6 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public class OrcAtomicColumnVector extends OrcColumnVector {
   private final boolean isTimestamp;
-  private final boolean isTimestampNTZ;
   private final boolean isDate;
 
   // Column vector for each type. Only 1 is populated for any type.
@@ -56,12 +54,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
       isTimestamp = false;
     }
 
-    if (type instanceof TimestampNTZType) {
-      isTimestampNTZ = true;
-    } else {
-      isTimestampNTZ = false;
-    }
-
     if (type instanceof DateType) {
       isDate = true;
     } else {
@@ -113,8 +105,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
     int index = getRowIndex(rowId);
     if (isTimestamp) {
       return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
-    } else if (isTimestampNTZ) {
-      return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index));
     } else {
       return longData.vector[index];
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 0c2856c..564e42e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -105,7 +105,7 @@ class OrcDeserializer(
       case IntegerType | _: YearMonthIntervalType => (ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)
 
-      case LongType | _: DayTimeIntervalType => (ordinal, value) =>
+      case LongType | _: DayTimeIntervalType | _: TimestampNTZType => (ordinal, value) =>
         updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
 
       case FloatType => (ordinal, value) =>
@@ -129,9 +129,6 @@ class OrcDeserializer(
       case TimestampType => (ordinal, value) =>
         updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
 
-      case TimestampNTZType => (ordinal, value) =>
-        updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp]))
-
       case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
         val v = OrcShimUtils.getDecimal(value)
         v.changePrecision(precision, scale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 39a8763..2b060c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -142,11 +142,10 @@ class OrcFileFormat
 
       val fs = filePath.getFileSystem(conf)
       val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-      val resultedColPruneInfo =
-        Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
-          OrcUtils.requestedColumnIds(
-            isCaseSensitive, dataSchema, requiredSchema, reader, conf)
-        }
+      val orcSchema =
+        Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema)
+      val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+        isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf)
 
       if (resultedColPruneInfo.isEmpty) {
         Iterator.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 0d85a45..4bb1c18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
-import java.sql.Timestamp
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
 
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -143,11 +142,11 @@ private[sql] object OrcFilters extends OrcFiltersBase {
   def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match {
     case BooleanType => PredicateLeaf.Type.BOOLEAN
     case ByteType | ShortType | IntegerType | LongType |
-         _: AnsiIntervalType => PredicateLeaf.Type.LONG
+         _: AnsiIntervalType | TimestampNTZType => PredicateLeaf.Type.LONG
     case FloatType | DoubleType => PredicateLeaf.Type.FLOAT
     case StringType => PredicateLeaf.Type.STRING
     case DateType => PredicateLeaf.Type.DATE
-    case TimestampType | TimestampNTZType => PredicateLeaf.Type.TIMESTAMP
+    case TimestampType => PredicateLeaf.Type.TIMESTAMP
     case _: DecimalType => PredicateLeaf.Type.DECIMAL
     case _ => throw QueryExecutionErrors.unsupportedOperationForDataTypeError(dataType)
   }
@@ -170,11 +169,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
     case _: TimestampType if value.isInstanceOf[Instant] =>
       toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
     case _: TimestampNTZType if value.isInstanceOf[LocalDateTime] =>
-      val orcTimestamp = OrcUtils.toOrcNTZ(localDateTimeToMicros(value.asInstanceOf[LocalDateTime]))
-      // Hive meets OrcTimestamp will throw ClassNotFoundException, So convert it.
-      val timestamp = new Timestamp(orcTimestamp.getTime)
-      timestamp.setNanos(orcTimestamp.getNanos)
-      timestamp
+      localDateTimeToMicros(value.asInstanceOf[LocalDateTime])
     case _: YearMonthIntervalType =>
       IntervalUtils.periodToMonths(value.asInstanceOf[Period]).longValue()
     case _: DayTimeIntervalType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
index a928cd9..5ed73c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
@@ -98,7 +98,7 @@ class OrcSerializer(dataSchema: StructType) {
       }
 
 
-    case LongType | _: DayTimeIntervalType =>
+    case LongType | _: DayTimeIntervalType | _: TimestampNTZType =>
       if (reuseObj) {
         val result = new LongWritable()
         (getter, ordinal) =>
@@ -147,8 +147,6 @@ class OrcSerializer(dataSchema: StructType) {
       result.setNanos(ts.getNanos)
       result
 
-    case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal))
-
     case DecimalType.Fixed(precision, scale) =>
       OrcShimUtils.getHiveDecimalWritable(precision, scale)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 1f05117..a68ce1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.orc
 
 import java.nio.charset.StandardCharsets.UTF_8
-import java.sql.Timestamp
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -29,7 +28,6 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.serde2.io.DateWritable
 import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
 import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
-import org.apache.orc.mapred.OrcTimestamp
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -39,8 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils}
-import org.apache.spark.sql.catalyst.util.DateTimeConstants._
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils}
@@ -199,7 +197,7 @@ object OrcUtils extends Logging {
       isCaseSensitive: Boolean,
       dataSchema: StructType,
       requiredSchema: StructType,
-      reader: Reader,
+      orcSchema: TypeDescription,
       conf: Configuration): Option[(Array[Int], Boolean)] = {
     def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = {
       orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach {
@@ -212,7 +210,6 @@ object OrcUtils extends Logging {
       }
     }
 
-    val orcSchema = reader.getSchema
     checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema)
     val orcFieldNames = orcSchema.getFieldNames.asScala
     val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
@@ -261,7 +258,6 @@ object OrcUtils extends Logging {
                 if (matchedOrcFields.size > 1) {
                   // Need to fail if there is ambiguity, i.e. more than one field is matched.
                   val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]")
-                  reader.close()
                   throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                     requiredFieldName, matchedOrcFieldsString)
                 } else {
@@ -285,18 +281,17 @@ object OrcUtils extends Logging {
    * Given a `StructType` object, this methods converts it to corresponding string representation
    * in ORC.
    */
-  def orcTypeDescriptionString(dt: DataType): String = dt match {
+  def getOrcSchemaString(dt: DataType): String = dt match {
     case s: StructType =>
       val fieldTypes = s.fields.map { f =>
-        s"${quoteIdentifier(f.name)}:${orcTypeDescriptionString(f.dataType)}"
+        s"${quoteIdentifier(f.name)}:${getOrcSchemaString(f.dataType)}"
       }
       s"struct<${fieldTypes.mkString(",")}>"
     case a: ArrayType =>
-      s"array<${orcTypeDescriptionString(a.elementType)}>"
+      s"array<${getOrcSchemaString(a.elementType)}>"
     case m: MapType =>
-      s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
-    case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName
-    case _: DayTimeIntervalType => LongType.catalogString
+      s"map<${getOrcSchemaString(m.keyType)},${getOrcSchemaString(m.valueType)}>"
+    case _: DayTimeIntervalType | _: TimestampNTZType => LongType.catalogString
     case _: YearMonthIntervalType => IntegerType.catalogString
     case _ => dt.catalogString
   }
@@ -306,16 +301,14 @@ object OrcUtils extends Logging {
       dt match {
         case y: YearMonthIntervalType =>
           val typeDesc = new TypeDescription(TypeDescription.Category.INT)
-          typeDesc.setAttribute(
-            CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
+          typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
           Some(typeDesc)
         case d: DayTimeIntervalType =>
           val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
-          typeDesc.setAttribute(
-            CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
+          typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
           Some(typeDesc)
         case n: TimestampNTZType =>
-          val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+          val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
           typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
           Some(typeDesc)
         case t: TimestampType =>
@@ -378,9 +371,9 @@ object OrcUtils extends Logging {
       partitionSchema: StructType,
       conf: Configuration): String = {
     val resultSchemaString = if (canPruneCols) {
-      OrcUtils.orcTypeDescriptionString(resultSchema)
+      OrcUtils.getOrcSchemaString(resultSchema)
     } else {
-      OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
+      OrcUtils.getOrcSchemaString(StructType(dataSchema.fields ++ partitionSchema.fields))
     }
     OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
     resultSchemaString
@@ -532,17 +525,4 @@ object OrcUtils extends Logging {
       resultRow
     }
   }
-
-  def fromOrcNTZ(ts: Timestamp): Long = {
-    DateTimeUtils.millisToMicros(ts.getTime) +
-      (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
-  }
-
-  def toOrcNTZ(micros: Long): OrcTimestamp = {
-    val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
-    val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
-    val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND)
-    result.setNanos(nanos.toInt)
-    result
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 63ad5ed..a955dd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -358,6 +358,8 @@ private[parquet] class ParquetRowConverter(
       case StringType =>
         new ParquetStringConverter(updater)
 
+      // As long as the parquet type is INT64 timestamp, whether logical annotation
+      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
       case TimestampType
         if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
            parquetType.getLogicalTypeAnnotation
@@ -368,6 +370,8 @@ private[parquet] class ParquetRowConverter(
           }
         }
 
+      // As long as the parquet type is INT64 timestamp, whether logical annotation
+      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
       case TimestampType
         if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
           parquetType.getLogicalTypeAnnotation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index ec6a3bb..ef13bea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -88,11 +88,9 @@ case class OrcPartitionReaderFactory(
     }
     val filePath = new Path(new URI(file.filePath))
 
-    val resultedColPruneInfo =
-      Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
-        OrcUtils.requestedColumnIds(
-          isCaseSensitive, dataSchema, readDataSchema, reader, conf)
-      }
+    val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema)
+    val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+      isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf)
 
     if (resultedColPruneInfo.isEmpty) {
       new EmptyPartitionReader[InternalRow]
@@ -131,11 +129,9 @@ case class OrcPartitionReaderFactory(
     }
     val filePath = new Path(new URI(file.filePath))
 
-    val resultedColPruneInfo =
-      Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
-        OrcUtils.requestedColumnIds(
-          isCaseSensitive, dataSchema, readDataSchema, reader, conf)
-      }
+    val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema)
+    val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+      isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf)
 
     if (resultedColPruneInfo.isEmpty) {
       new EmptyPartitionReader
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
index 1ac9266e..63c20ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
@@ -43,7 +43,7 @@ case class OrcWrite(
 
     val conf = job.getConfiguration
 
-    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema))
+    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.getOrcSchemaString(dataSchema))
 
     conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 49b7cfa..f093a5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -35,6 +35,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -803,6 +804,31 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-37463: read/write Timestamp ntz to Orc with different time zone") {
+    DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) {
+      val sqlText = """
+                      |select
+                      | timestamp_ntz '2021-06-01 00:00:00' ts_ntz1,
+                      | timestamp_ntz '1883-11-16 00:00:00.0' as ts_ntz2,
+                      | timestamp_ntz '2021-03-14 02:15:00.0' as ts_ntz3
+                      |""".stripMargin
+
+      val df = sql(sqlText)
+
+      df.write.mode("overwrite").orc("ts_ntz_orc")
+
+      val query = "select * from `orc`.`ts_ntz_orc`"
+
+      DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+        DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+          withAllNativeOrcReaders {
+            checkAnswer(sql(query), df)
+          }
+        }
+      }
+    }
+  }
 }
 
 class OrcV1QuerySuite extends OrcQuerySuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org