You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/07 18:35:38 UTC

[spark] branch branch-3.0 updated: [SPARK-30752][SQL] Fix `to_utc_timestamp` on daylight saving day

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a9d9d83  [SPARK-30752][SQL] Fix `to_utc_timestamp` on daylight saving day
a9d9d83 is described below

commit a9d9d834c600a4b9888f8781cec04fcbef29972d
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Sat Feb 8 02:32:07 2020 +0800

    [SPARK-30752][SQL] Fix `to_utc_timestamp` on daylight saving day
    
    ### What changes were proposed in this pull request?
    - Rewrite the `convertTz` method of `DateTimeUtils` using Java 8 time API
    - Change types of `convertTz` parameters from `TimeZone` to `ZoneId`. This allows to avoid unnecessary conversions `TimeZone` -> `ZoneId` and performance regressions as a consequence.
    
    ### Why are the changes needed?
    - Fixes incorrect behavior of `to_utc_timestamp` on daylight saving day. For example:
    ```scala
    scala> df.select(to_utc_timestamp(lit("2019-11-03T12:00:00"), "Asia/Hong_Kong").as("local UTC")).show
    +-------------------+
    |          local UTC|
    +-------------------+
    |2019-11-03 03:00:00|
    +-------------------+
    ```
    but the result must be 2019-11-03 04:00:00:
    <img width="1013" alt="Screen Shot 2020-02-06 at 20 09 36" src="https://user-images.githubusercontent.com/1580697/73960846-a129bb00-491c-11ea-92f5-45831cb28a62.png">
    
    - Simplifies the code, and make it more maintainable
    - Switches `convertTz` on Proleptic Gregorian calendar used by Java 8 time classes by default. That makes the function consistent to other date-time functions.
    
    ### Does this PR introduce any user-facing change?
    Yes, after the changes `to_utc_timestamp` returns the correct result `2019-11-03 04:00:00`.
    
    ### How was this patch tested?
    - By existing test suite `DateTimeUtilsSuite`, `DateFunctionsSuite` and `DateExpressionsSuite`.
    - Added `convert time zones on a daylight saving day` to DateFunctionsSuite
    
    Closes #27474 from MaxGekk/port-convertTz-on-Java8-api.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit a3e77773cfa03a18d31370acd9a10562ff5312bb)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/expressions/datetimeExpressions.scala | 16 +++++-------
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 28 ++++-----------------
 .../parquet/VectorizedColumnReader.java            |  9 ++++---
 .../parquet/VectorizedParquetRecordReader.java     |  6 ++---
 .../datasources/parquet/ParquetFileFormat.scala    |  2 +-
 .../datasources/parquet/ParquetReadSupport.scala   |  5 ++--
 .../parquet/ParquetRecordMaterializer.scala        |  4 +--
 .../datasources/parquet/ParquetRowConverter.scala  |  9 +++----
 .../v2/parquet/ParquetPartitionReaderFactory.scala | 10 ++++----
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 29 ++++++++++++++++++++--
 .../parquet/ParquetInteroperabilitySuite.scala     |  5 ++--
 11 files changed, 64 insertions(+), 59 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 28f1d34..aa2bd5a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -1176,14 +1176,12 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
            |long ${ev.value} = 0;
          """.stripMargin)
       } else {
-        val tzClass = classOf[TimeZone].getName
+        val tzClass = classOf[ZoneId].getName
         val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
         val escapedTz = StringEscapeUtils.escapeJava(tz.toString)
         val tzTerm = ctx.addMutableState(tzClass, "tz",
-          v => s"""$v = $dtu.getTimeZone("$escapedTz");""")
-        val utcTerm = "tzUTC"
-        ctx.addImmutableStateIfNotExists(tzClass, utcTerm,
-          v => s"""$v = $dtu.getTimeZone("UTC");""")
+          v => s"""$v = $dtu.getZoneId("$escapedTz");""")
+        val utcTerm = "java.time.ZoneOffset.UTC"
         val eval = left.genCode(ctx)
         ev.copy(code = code"""
            |${eval.code}
@@ -1382,14 +1380,12 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
            |long ${ev.value} = 0;
          """.stripMargin)
       } else {
-        val tzClass = classOf[TimeZone].getName
+        val tzClass = classOf[ZoneId].getName
         val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
         val escapedTz = StringEscapeUtils.escapeJava(tz.toString)
         val tzTerm = ctx.addMutableState(tzClass, "tz",
-          v => s"""$v = $dtu.getTimeZone("$escapedTz");""")
-        val utcTerm = "tzUTC"
-        ctx.addImmutableStateIfNotExists(tzClass, utcTerm,
-          v => s"""$v = $dtu.getTimeZone("UTC");""")
+          v => s"""$v = $dtu.getZoneId("$escapedTz");""")
+        val utcTerm = "java.time.ZoneOffset.UTC"
         val eval = left.genCode(ctx)
         ev.copy(code = code"""
            |${eval.code}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 6800abb..8eb56094 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -801,27 +801,9 @@ object DateTimeUtils {
    * mapping, the conversion here may return wrong result, we should make the timestamp
    * timezone-aware.
    */
-  def convertTz(ts: SQLTimestamp, fromZone: TimeZone, toZone: TimeZone): SQLTimestamp = {
-    // We always use local timezone to parse or format a timestamp
-    val localZone = defaultTimeZone()
-    val utcTs = if (fromZone.getID == localZone.getID) {
-      ts
-    } else {
-      // get the human time using local time zone, that actually is in fromZone.
-      val localZoneOffsetMs = localZone.getOffset(MICROSECONDS.toMillis(ts))
-      val localTsUs = ts + MILLISECONDS.toMicros(localZoneOffsetMs)  // in fromZone
-      val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), fromZone)
-      localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
-    }
-    if (toZone.getID == localZone.getID) {
-      utcTs
-    } else {
-      val toZoneOffsetMs = toZone.getOffset(MICROSECONDS.toMillis(utcTs))
-      val localTsUs = utcTs + MILLISECONDS.toMicros(toZoneOffsetMs)  // in toZone
-      // treat it as local timezone, convert to UTC (we could get the expected human time back)
-      val offsetFromLocalMs = getOffsetFromLocalMillis(MICROSECONDS.toMillis(localTsUs), localZone)
-      localTsUs - MILLISECONDS.toMicros(offsetFromLocalMs)
-    }
+  def convertTz(ts: SQLTimestamp, fromZone: ZoneId, toZone: ZoneId): SQLTimestamp = {
+    val rebasedDateTime = microsToInstant(ts).atZone(toZone).toLocalDateTime.atZone(fromZone)
+    instantToMicros(rebasedDateTime.toInstant)
   }
 
   /**
@@ -829,7 +811,7 @@ object DateTimeUtils {
    * representation in their timezone.
    */
   def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
+    convertTz(time, ZoneOffset.UTC, getZoneId(timeZone))
   }
 
   /**
@@ -837,7 +819,7 @@ object DateTimeUtils {
    * string representation in their timezone.
    */
   def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
+    convertTz(time, getZoneId(timeZone), ZoneOffset.UTC)
   }
 
   /**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index ba26b57..3294655 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,8 +18,9 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.util.Arrays;
-import java.util.TimeZone;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
@@ -98,14 +99,14 @@ public class VectorizedColumnReader {
   private final ColumnDescriptor descriptor;
   private final OriginalType originalType;
   // The timezone conversion to apply to int96 timestamps. Null if no conversion.
-  private final TimeZone convertTz;
-  private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC();
+  private final ZoneId convertTz;
+  private static final ZoneId UTC = ZoneOffset.UTC;
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
       OriginalType originalType,
       PageReader pageReader,
-      TimeZone convertTz) throws IOException {
+      ZoneId convertTz) throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
     this.convertTz = convertTz;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index f028613..7306709 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.List;
-import java.util.TimeZone;
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -86,7 +86,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    * The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
    * workaround incompatibilities between different engines when writing timestamp values.
    */
-  private TimeZone convertTz = null;
+  private ZoneId convertTz = null;
 
   /**
    * columnBatch object that is used for batch decoding. This is created on first use and triggers
@@ -116,7 +116,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private final MemoryMode MEMORY_MODE;
 
-  public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) {
+  public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) {
     this.convertTz = convertTz;
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
     this.capacity = capacity;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f52aaf0..29dbd8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -295,7 +295,7 @@ class ParquetFileFormat
 
       val convertTz =
         if (timestampConversion && !isCreatedByParquetMr) {
-          Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 69c8bad..c05ecf1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.util.{Locale, Map => JMap, TimeZone}
+import java.time.ZoneId
+import java.util.{Locale, Map => JMap}
 
 import scala.collection.JavaConverters._
 
@@ -49,7 +50,7 @@ import org.apache.spark.sql.types._
  * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
  */
-class ParquetReadSupport(val convertTz: Option[TimeZone],
+class ParquetReadSupport(val convertTz: Option[ZoneId],
     enableVectorizedReader: Boolean)
   extends ReadSupport[InternalRow] with Logging {
   private var catalystRequestedSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 3098a33..5622169 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.util.TimeZone
+import java.time.ZoneId
 
 import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
 import org.apache.parquet.schema.MessageType
@@ -36,7 +36,7 @@ private[parquet] class ParquetRecordMaterializer(
     parquetSchema: MessageType,
     catalystSchema: StructType,
     schemaConverter: ParquetToSparkSchemaConverter,
-    convertTz: Option[TimeZone])
+    convertTz: Option[ZoneId])
   extends RecordMaterializer[InternalRow] {
 
   private val rootConverter =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 98ac2ec..850adae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
-import java.util.TimeZone
+import java.time.{ZoneId, ZoneOffset}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -125,7 +125,7 @@ private[parquet] class ParquetRowConverter(
     schemaConverter: ParquetToSparkSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
-    convertTz: Option[TimeZone],
+    convertTz: Option[ZoneId],
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -154,8 +154,6 @@ private[parquet] class ParquetRowConverter(
        |${catalystType.prettyJson}
      """.stripMargin)
 
-  private[this] val UTC = DateTimeUtils.TimeZoneUTC
-
   /**
    * Updater used together with field converters within a [[ParquetRowConverter]].  It propagates
    * converted filed values to the `ordinal`-th cell in `currentRow`.
@@ -292,7 +290,8 @@ private[parquet] class ParquetRowConverter(
             val timeOfDayNanos = buf.getLong
             val julianDay = buf.getInt
             val rawTime = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
-            val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, UTC)).getOrElse(rawTime)
+            val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, ZoneOffset.UTC))
+              .getOrElse(rawTime)
             updater.setLong(adjTime)
           }
         }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index b2fc724..047bc74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.execution.datasources.v2.parquet
 
 import java.net.URI
-import java.util.TimeZone
+import java.time.ZoneId
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
@@ -117,7 +117,7 @@ case class ParquetPartitionReaderFactory(
       file: PartitionedFile,
       buildReaderFunc: (
         ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate],
-          Option[TimeZone]) => RecordReader[Void, T]): RecordReader[Void, T] = {
+          Option[ZoneId]) => RecordReader[Void, T]): RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
     val filePath = new Path(new URI(file.filePath))
@@ -156,7 +156,7 @@ case class ParquetPartitionReaderFactory(
 
     val convertTz =
       if (timestampConversion && !isCreatedByParquetMr) {
-        Some(DateTimeUtils.getTimeZone(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+        Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
       } else {
         None
       }
@@ -184,7 +184,7 @@ case class ParquetPartitionReaderFactory(
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
-      convertTz: Option[TimeZone]): RecordReader[Void, InternalRow] = {
+      convertTz: Option[ZoneId]): RecordReader[Void, InternalRow] = {
     logDebug(s"Falling back to parquet-mr")
     val taskContext = Option(TaskContext.get())
     // ParquetRecordReader returns InternalRow
@@ -213,7 +213,7 @@ case class ParquetPartitionReaderFactory(
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
-      convertTz: Option[TimeZone]): VectorizedParquetRecordReader = {
+      convertTz: Option[ZoneId]): VectorizedParquetRecordReader = {
     val taskContext = Option(TaskContext.get())
     val vectorizedReader = new VectorizedParquetRecordReader(
       convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 3b3d3cc..bb8cdf3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
-import java.time.Instant
-import java.util.Locale
+import java.time.{Instant, LocalDateTime}
+import java.util.{Locale, TimeZone}
 import java.util.concurrent.TimeUnit
 
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
@@ -803,4 +803,29 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
       checkTimeZoneParsing(null)
     }
   }
+
+  test("SPARK-30752: convert time zones on a daylight saving day") {
+    val systemTz = "PST"
+    val sessionTz = "UTC"
+    val fromTz = "Asia/Hong_Kong"
+    val fromTs = "2019-11-03T12:00:00" // daylight saving date in PST
+    val utsTs = "2019-11-03T04:00:00"
+    val defaultTz = TimeZone.getDefault
+    try {
+      TimeZone.setDefault(DateTimeUtils.getTimeZone(systemTz))
+      withSQLConf(
+        SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
+        SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz) {
+        val expected = LocalDateTime.parse(utsTs)
+          .atZone(DateTimeUtils.getZoneId(sessionTz))
+          .toInstant
+        val df = Seq(fromTs).toDF("localTs")
+        checkAnswer(
+          df.select(to_utc_timestamp(col("localTs"), fromTz)),
+          Row(expected))
+      }
+    } finally {
+      TimeZone.setDefault(defaultTz)
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 1ded34f..649a46f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
+import java.time.ZoneOffset
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
@@ -145,8 +146,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
               impalaFileData.map { ts =>
                 DateTimeUtils.toJavaTimestamp(DateTimeUtils.convertTz(
                   DateTimeUtils.fromJavaTimestamp(ts),
-                  DateTimeUtils.TimeZoneUTC,
-                  DateTimeUtils.getTimeZone(conf.sessionLocalTimeZone)))
+                  ZoneOffset.UTC,
+                  DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)))
               }
             }
             val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org