You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/24 14:46:45 UTC
[spark] branch branch-3.3 updated: [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 5582f92 [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64
5582f92 is described below
commit 5582f92046a3486dc6d30e6e4083446fdbd52667
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Mar 24 22:43:32 2022 +0800
[SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64
### What changes were proposed in this pull request?
#33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion https://github.com/apache/spark/pull/34741#issuecomment-983660633, we just to fix read/write timestamp ntz to Orc uses int64.
### Why are the changes needed?
Fix the bug about read/write timestamp ntz from/to Orc with different times zone.
### Does this PR introduce _any_ user-facing change?
Yes. Orc timestamp ntz is a new feature.
### How was this patch tested?
New tests.
Closes #34984 from beliefer/SPARK-37463-int64.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit e410d98f57750080ad46932cc9211d2cf5154c24)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/orc/OrcAtomicColumnVector.java | 10 -----
.../datasources/orc/OrcDeserializer.scala | 5 +--
.../execution/datasources/orc/OrcFileFormat.scala | 9 ++---
.../sql/execution/datasources/orc/OrcFilters.scala | 11 ++----
.../execution/datasources/orc/OrcSerializer.scala | 4 +-
.../sql/execution/datasources/orc/OrcUtils.scala | 46 ++++++----------------
.../datasources/parquet/ParquetRowConverter.scala | 4 ++
.../v2/orc/OrcPartitionReaderFactory.scala | 16 +++-----
.../execution/datasources/v2/orc/OrcWrite.scala | 2 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 26 ++++++++++++
10 files changed, 59 insertions(+), 74 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
index b4f7b99..c2d8334 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
@@ -27,7 +27,6 @@ import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
-import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
@@ -37,7 +36,6 @@ import org.apache.spark.unsafe.types.UTF8String;
*/
public class OrcAtomicColumnVector extends OrcColumnVector {
private final boolean isTimestamp;
- private final boolean isTimestampNTZ;
private final boolean isDate;
// Column vector for each type. Only 1 is populated for any type.
@@ -56,12 +54,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
isTimestamp = false;
}
- if (type instanceof TimestampNTZType) {
- isTimestampNTZ = true;
- } else {
- isTimestampNTZ = false;
- }
-
if (type instanceof DateType) {
isDate = true;
} else {
@@ -113,8 +105,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
int index = getRowIndex(rowId);
if (isTimestamp) {
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
- } else if (isTimestampNTZ) {
- return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 0c2856c..564e42e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -105,7 +105,7 @@ class OrcDeserializer(
case IntegerType | _: YearMonthIntervalType => (ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)
- case LongType | _: DayTimeIntervalType => (ordinal, value) =>
+ case LongType | _: DayTimeIntervalType | _: TimestampNTZType => (ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
case FloatType => (ordinal, value) =>
@@ -129,9 +129,6 @@ class OrcDeserializer(
case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
- case TimestampNTZType => (ordinal, value) =>
- updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp]))
-
case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val v = OrcShimUtils.getDecimal(value)
v.changePrecision(precision, scale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 39a8763..2b060c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -142,11 +142,10 @@ class OrcFileFormat
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val resultedColPruneInfo =
- Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
- OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, requiredSchema, reader, conf)
- }
+ val orcSchema =
+ Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema)
+ val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf)
if (resultedColPruneInfo.isEmpty) {
Iterator.empty
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 0d85a45..4bb1c18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.orc
-import java.sql.Timestamp
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -143,11 +142,11 @@ private[sql] object OrcFilters extends OrcFiltersBase {
def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match {
case BooleanType => PredicateLeaf.Type.BOOLEAN
case ByteType | ShortType | IntegerType | LongType |
- _: AnsiIntervalType => PredicateLeaf.Type.LONG
+ _: AnsiIntervalType | TimestampNTZType => PredicateLeaf.Type.LONG
case FloatType | DoubleType => PredicateLeaf.Type.FLOAT
case StringType => PredicateLeaf.Type.STRING
case DateType => PredicateLeaf.Type.DATE
- case TimestampType | TimestampNTZType => PredicateLeaf.Type.TIMESTAMP
+ case TimestampType => PredicateLeaf.Type.TIMESTAMP
case _: DecimalType => PredicateLeaf.Type.DECIMAL
case _ => throw QueryExecutionErrors.unsupportedOperationForDataTypeError(dataType)
}
@@ -170,11 +169,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
case _: TimestampType if value.isInstanceOf[Instant] =>
toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
case _: TimestampNTZType if value.isInstanceOf[LocalDateTime] =>
- val orcTimestamp = OrcUtils.toOrcNTZ(localDateTimeToMicros(value.asInstanceOf[LocalDateTime]))
- // Hive meets OrcTimestamp will throw ClassNotFoundException, So convert it.
- val timestamp = new Timestamp(orcTimestamp.getTime)
- timestamp.setNanos(orcTimestamp.getNanos)
- timestamp
+ localDateTimeToMicros(value.asInstanceOf[LocalDateTime])
case _: YearMonthIntervalType =>
IntervalUtils.periodToMonths(value.asInstanceOf[Period]).longValue()
case _: DayTimeIntervalType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
index a928cd9..5ed73c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
@@ -98,7 +98,7 @@ class OrcSerializer(dataSchema: StructType) {
}
- case LongType | _: DayTimeIntervalType =>
+ case LongType | _: DayTimeIntervalType | _: TimestampNTZType =>
if (reuseObj) {
val result = new LongWritable()
(getter, ordinal) =>
@@ -147,8 +147,6 @@ class OrcSerializer(dataSchema: StructType) {
result.setNanos(ts.getNanos)
result
- case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal))
-
case DecimalType.Fixed(precision, scale) =>
OrcShimUtils.getHiveDecimalWritable(precision, scale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 1f05117..a68ce1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.orc
import java.nio.charset.StandardCharsets.UTF_8
-import java.sql.Timestamp
import java.util.Locale
import scala.collection.JavaConverters._
@@ -29,7 +28,6 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
-import org.apache.orc.mapred.OrcTimestamp
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -39,8 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils}
-import org.apache.spark.sql.catalyst.util.DateTimeConstants._
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils}
@@ -199,7 +197,7 @@ object OrcUtils extends Logging {
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
- reader: Reader,
+ orcSchema: TypeDescription,
conf: Configuration): Option[(Array[Int], Boolean)] = {
def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = {
orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach {
@@ -212,7 +210,6 @@ object OrcUtils extends Logging {
}
}
- val orcSchema = reader.getSchema
checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema)
val orcFieldNames = orcSchema.getFieldNames.asScala
val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
@@ -261,7 +258,6 @@ object OrcUtils extends Logging {
if (matchedOrcFields.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched.
val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]")
- reader.close()
throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
requiredFieldName, matchedOrcFieldsString)
} else {
@@ -285,18 +281,17 @@ object OrcUtils extends Logging {
* Given a `StructType` object, this methods converts it to corresponding string representation
* in ORC.
*/
- def orcTypeDescriptionString(dt: DataType): String = dt match {
+ def getOrcSchemaString(dt: DataType): String = dt match {
case s: StructType =>
val fieldTypes = s.fields.map { f =>
- s"${quoteIdentifier(f.name)}:${orcTypeDescriptionString(f.dataType)}"
+ s"${quoteIdentifier(f.name)}:${getOrcSchemaString(f.dataType)}"
}
s"struct<${fieldTypes.mkString(",")}>"
case a: ArrayType =>
- s"array<${orcTypeDescriptionString(a.elementType)}>"
+ s"array<${getOrcSchemaString(a.elementType)}>"
case m: MapType =>
- s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
- case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName
- case _: DayTimeIntervalType => LongType.catalogString
+ s"map<${getOrcSchemaString(m.keyType)},${getOrcSchemaString(m.valueType)}>"
+ case _: DayTimeIntervalType | _: TimestampNTZType => LongType.catalogString
case _: YearMonthIntervalType => IntegerType.catalogString
case _ => dt.catalogString
}
@@ -306,16 +301,14 @@ object OrcUtils extends Logging {
dt match {
case y: YearMonthIntervalType =>
val typeDesc = new TypeDescription(TypeDescription.Category.INT)
- typeDesc.setAttribute(
- CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
+ typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
Some(typeDesc)
case d: DayTimeIntervalType =>
val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
- typeDesc.setAttribute(
- CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
+ typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
Some(typeDesc)
case n: TimestampNTZType =>
- val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+ val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
Some(typeDesc)
case t: TimestampType =>
@@ -378,9 +371,9 @@ object OrcUtils extends Logging {
partitionSchema: StructType,
conf: Configuration): String = {
val resultSchemaString = if (canPruneCols) {
- OrcUtils.orcTypeDescriptionString(resultSchema)
+ OrcUtils.getOrcSchemaString(resultSchema)
} else {
- OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
+ OrcUtils.getOrcSchemaString(StructType(dataSchema.fields ++ partitionSchema.fields))
}
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
resultSchemaString
@@ -532,17 +525,4 @@ object OrcUtils extends Logging {
resultRow
}
}
-
- def fromOrcNTZ(ts: Timestamp): Long = {
- DateTimeUtils.millisToMicros(ts.getTime) +
- (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
- }
-
- def toOrcNTZ(micros: Long): OrcTimestamp = {
- val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
- val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
- val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND)
- result.setNanos(nanos.toInt)
- result
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 63ad5ed..a955dd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -358,6 +358,8 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)
+ // As long as the parquet type is INT64 timestamp, whether logical annotation
+ // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
case TimestampType
if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
parquetType.getLogicalTypeAnnotation
@@ -368,6 +370,8 @@ private[parquet] class ParquetRowConverter(
}
}
+ // As long as the parquet type is INT64 timestamp, whether logical annotation
+ // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
case TimestampType
if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
parquetType.getLogicalTypeAnnotation
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index ec6a3bb..ef13bea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -88,11 +88,9 @@ case class OrcPartitionReaderFactory(
}
val filePath = new Path(new URI(file.filePath))
- val resultedColPruneInfo =
- Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
- OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, readDataSchema, reader, conf)
- }
+ val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema)
+ val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf)
if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader[InternalRow]
@@ -131,11 +129,9 @@ case class OrcPartitionReaderFactory(
}
val filePath = new Path(new URI(file.filePath))
- val resultedColPruneInfo =
- Utils.tryWithResource(createORCReader(filePath, conf)) { reader =>
- OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, readDataSchema, reader, conf)
- }
+ val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema)
+ val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf)
if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
index 1ac9266e..63c20ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
@@ -43,7 +43,7 @@ case class OrcWrite(
val conf = job.getConfiguration
- conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema))
+ conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.getOrcSchemaString(dataSchema))
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 49b7cfa..f093a5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -35,6 +35,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -803,6 +804,31 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
}
}
}
+
+ test("SPARK-37463: read/write Timestamp ntz to Orc with different time zone") {
+ DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) {
+ val sqlText = """
+ |select
+ | timestamp_ntz '2021-06-01 00:00:00' ts_ntz1,
+ | timestamp_ntz '1883-11-16 00:00:00.0' as ts_ntz2,
+ | timestamp_ntz '2021-03-14 02:15:00.0' as ts_ntz3
+ |""".stripMargin
+
+ val df = sql(sqlText)
+
+ df.write.mode("overwrite").orc("ts_ntz_orc")
+
+ val query = "select * from `orc`.`ts_ntz_orc`"
+
+ DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+ DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+ withAllNativeOrcReaders {
+ checkAnswer(sql(query), df)
+ }
+ }
+ }
+ }
+ }
}
class OrcV1QuerySuite extends OrcQuerySuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org