You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/16 01:32:34 UTC

[spark] branch master updated: [SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a27952  [SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON
8a27952 is described below

commit 8a27952cdbf492939d9bda59e2f516f574581636
Author: Maxim Gekk <ma...@databricks.com>
AuthorDate: Sun Dec 16 09:32:13 2018 +0800

    [SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to switch on **java.time API** for parsing timestamps and dates from JSON inputs with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behavior with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates.
    
    ## How was this patch tested?
    
    It was tested by `JsonExpressionsSuite`, `JsonFunctionsSuite` and `JsonSuite`.
    
    Closes #23196 from MaxGekk/json-time-parser.
    
    Lead-authored-by: Maxim Gekk <ma...@databricks.com>
    Co-authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-migration-guide-upgrade.md                |   2 +-
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    |   6 +-
 .../sql/catalyst/csv/UnivocityGenerator.scala      |   8 +-
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |   6 +-
 .../spark/sql/catalyst/json/JSONOptions.scala      |  10 +-
 .../spark/sql/catalyst/json/JacksonGenerator.scala |  14 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    |  35 +---
 ...imeFormatter.scala => TimestampFormatter.scala} |  93 ++++++----
 .../spark/sql/util/DateTimeFormatterSuite.scala    | 103 -----------
 .../sql/util/DateTimestampFormatterSuite.scala     | 174 ++++++++++++++++++
 .../sql/execution/datasources/json/JsonSuite.scala | 201 +++++++++++----------
 .../spark/sql/sources/HadoopFsRelationTest.scala   | 105 ++++++-----
 12 files changed, 422 insertions(+), 335 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index 8834e89..115fc65 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
 
-  - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
+  - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback  [...]
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 345dc4d..35ade13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.util.DateTimeFormatter
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
 
   @transient
-  private lazy val timeParser = DateTimeFormatter(
+  private lazy val timestampParser = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
@@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
     // This case infers a custom `dataFormat` is set.
-    if ((allCatch opt timeParser.parse(field)).isDefined) {
+    if ((allCatch opt timestampParser.parse(field)).isDefined) {
       TimestampType
     } else {
       tryParseBoolean(field)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index af09cd6..f012d96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -22,7 +22,7 @@ import java.io.Writer
 import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.types._
 
 class UnivocityGenerator(
@@ -41,18 +41,18 @@ class UnivocityGenerator(
   private val valueConverters: Array[ValueConverter] =
     schema.map(_.dataType).map(makeConverter).toArray
 
-  private val timeFormatter = DateTimeFormatter(
+  private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
-  private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
 
   private def makeConverter(dataType: DataType): ValueConverter = dataType match {
     case DateType =>
       (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
 
     case TimestampType =>
-      (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
+      (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
 
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 0f375e0..ed08912 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -74,11 +74,11 @@ class UnivocityParser(
 
   private val row = new GenericInternalRow(requiredSchema.length)
 
-  private val timeFormatter = DateTimeFormatter(
+  private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
-  private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
@@ -158,7 +158,7 @@ class UnivocityParser(
       }
 
     case _: TimestampType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
+      nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)
 
     case _: DateType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index e10b8a3..eaff3fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.util.{Locale, TimeZone}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
@@ -82,13 +81,10 @@ private[sql] class JSONOptions(
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
-  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
+  val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
 
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
+  val timestampFormat: String =
+    parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index d02a2be..951f519 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types._
 
 /**
@@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(
 
   private val lineSeparator: String = options.lineSeparatorInWrite
 
+  private val timestampFormatter = TimestampFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
   private def makeWriter(dataType: DataType): ValueWriter = dataType match {
     case NullType =>
       (row: SpecializedGetters, ordinal: Int) =>
@@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(
 
     case TimestampType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        val timestampString =
-          options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+        val timestampString = timestampFormatter.format(row.getLong(ordinal))
         gen.writeString(timestampString)
 
     case DateType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        val dateString =
-          options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+        val dateString = dateFormatter.format(row.getInt(ordinal))
         gen.writeString(dateString)
 
     case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 7e3bd4d..3f245e1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -55,6 +55,12 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
+  private val timestampFormatter = TimestampFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
   /**
    * Create a converter which converts the JSON documents held by the `JsonParser`
    * to a value according to a desired schema. This is a wrapper for the method
@@ -218,17 +224,7 @@ class JacksonParser(
     case TimestampType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
         case VALUE_STRING if parser.getTextLength >= 1 =>
-          val stringValue = parser.getText
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.
-          Long.box {
-            Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
-              .getOrElse {
-                // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-                // compatibility.
-                DateTimeUtils.stringToTime(stringValue).getTime * 1000L
-              }
-          }
+          timestampFormatter.parse(parser.getText)
 
         case VALUE_NUMBER_INT =>
           parser.getLongValue * 1000000L
@@ -237,22 +233,7 @@ class JacksonParser(
     case DateType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
         case VALUE_STRING if parser.getTextLength >= 1 =>
-          val stringValue = parser.getText
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.x
-          Int.box {
-            Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
-              .orElse {
-                // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-                // compatibility.
-                Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
-              }
-              .getOrElse {
-                // In Spark 1.5.0, we store the data as number of days since epoch in string.
-                // So, we just convert it to Int.
-                stringValue.toInt
-              }
-          }
+          dateFormatter.parse(parser.getText)
       }
 
     case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
similarity index 63%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index ad1f413..2b8d22d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
 
 import java.time._
 import java.time.format.DateTimeFormatterBuilder
-import java.time.temporal.{ChronoField, TemporalQueries}
+import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
 import java.util.{Locale, TimeZone}
 
 import scala.util.Try
@@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.sql.internal.SQLConf
 
-sealed trait DateTimeFormatter {
+sealed trait TimestampFormatter {
   def parse(s: String): Long // returns microseconds since epoch
   def format(us: Long): String
 }
 
-class Iso8601DateTimeFormatter(
+trait FormatterUtils {
+  protected def zoneId: ZoneId
+  protected def buildFormatter(
+      pattern: String,
+      locale: Locale): java.time.format.DateTimeFormatter = {
+    new DateTimeFormatterBuilder()
+      .appendPattern(pattern)
+      .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
+      .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+      .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+      .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+      .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+      .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+      .toFormatter(locale)
+  }
+  protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
+    val localDateTime = LocalDateTime.from(temporalAccessor)
+    val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
+    Instant.from(zonedDateTime)
+  }
+}
+
+class Iso8601TimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends DateTimeFormatter {
-  val formatter = new DateTimeFormatterBuilder()
-    .appendPattern(pattern)
-    .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
-    .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
-    .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
-    .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
-    .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
-    .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
-    .toFormatter(locale)
+    locale: Locale) extends TimestampFormatter with FormatterUtils {
+  val zoneId = timeZone.toZoneId
+  val formatter = buildFormatter(pattern, locale)
 
   def toInstant(s: String): Instant = {
     val temporalAccessor = formatter.parse(s)
     if (temporalAccessor.query(TemporalQueries.offset()) == null) {
-      val localDateTime = LocalDateTime.from(temporalAccessor)
-      val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
-      Instant.from(zonedDateTime)
+      toInstantWithZoneId(temporalAccessor)
     } else {
       Instant.from(temporalAccessor)
     }
@@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
   }
 }
 
-class LegacyDateTimeFormatter(
+class LegacyTimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends DateTimeFormatter {
+    locale: Locale) extends TimestampFormatter {
   val format = FastDateFormat.getInstance(pattern, timeZone, locale)
 
   protected def toMillis(s: String): Long = format.parse(s).getTime
@@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
   }
 }
 
-class LegacyFallbackDateTimeFormatter(
+class LegacyFallbackTimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
+    locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
   override def toMillis(s: String): Long = {
     Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
   }
 }
 
-object DateTimeFormatter {
-  def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
+object TimestampFormatter {
+  def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
+      new LegacyFallbackTimestampFormatter(format, timeZone, locale)
     } else {
-      new Iso8601DateTimeFormatter(format, timeZone, locale)
+      new Iso8601TimestampFormatter(format, timeZone, locale)
     }
   }
 }
@@ -116,13 +129,19 @@ sealed trait DateFormatter {
 
 class Iso8601DateFormatter(
     pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends DateFormatter {
+    locale: Locale) extends DateFormatter with FormatterUtils {
+
+  val zoneId = ZoneId.of("UTC")
+
+  val formatter = buildFormatter(pattern, locale)
 
-  val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
+  def toInstant(s: String): Instant = {
+    val temporalAccessor = formatter.parse(s)
+    toInstantWithZoneId(temporalAccessor)
+  }
 
   override def parse(s: String): Int = {
-    val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
+    val seconds = toInstant(s).getEpochSecond
     val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
 
     days.toInt
@@ -130,15 +149,12 @@ class Iso8601DateFormatter(
 
   override def format(days: Int): String = {
     val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
-    dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
+    formatter.withZone(zoneId).format(instant)
   }
 }
 
-class LegacyDateFormatter(
-    pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends DateFormatter {
-  val format = FastDateFormat.getInstance(pattern, timeZone, locale)
+class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
+  val format = FastDateFormat.getInstance(pattern, locale)
 
   def parse(s: String): Int = {
     val milliseconds = format.parse(s).getTime
@@ -153,8 +169,7 @@ class LegacyDateFormatter(
 
 class LegacyFallbackDateFormatter(
     pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
+    locale: Locale) extends LegacyDateFormatter(pattern, locale) {
   override def parse(s: String): Int = {
     Try(super.parse(s)).orElse {
       // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
@@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
 }
 
 object DateFormatter {
-  def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
+  def apply(format: String, locale: Locale): DateFormatter = {
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackDateFormatter(format, timeZone, locale)
+      new LegacyFallbackDateFormatter(format, locale)
     } else {
-      new Iso8601DateFormatter(format, timeZone, locale)
+      new Iso8601DateFormatter(format, locale)
     }
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
deleted file mode 100644
index 02d4ee0..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import java.util.{Locale, TimeZone}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils}
-
-class DateTimeFormatterSuite  extends SparkFunSuite {
-  test("parsing dates using time zones") {
-    val localDate = "2018-12-02"
-    val expectedDays = Map(
-      "UTC" -> 17867,
-      "PST" -> 17867,
-      "CET" -> 17866,
-      "Africa/Dakar" -> 17867,
-      "America/Los_Angeles" -> 17867,
-      "Antarctica/Vostok" -> 17866,
-      "Asia/Hong_Kong" -> 17866,
-      "Europe/Amsterdam" -> 17866)
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
-      val daysSinceEpoch = formatter.parse(localDate)
-      assert(daysSinceEpoch === expectedDays(timeZone))
-    }
-  }
-
-  test("parsing timestamps using time zones") {
-    val localDate = "2018-12-02T10:11:12.001234"
-    val expectedMicros = Map(
-      "UTC" -> 1543745472001234L,
-      "PST" -> 1543774272001234L,
-      "CET" -> 1543741872001234L,
-      "Africa/Dakar" -> 1543745472001234L,
-      "America/Los_Angeles" -> 1543774272001234L,
-      "Antarctica/Vostok" -> 1543723872001234L,
-      "Asia/Hong_Kong" -> 1543716672001234L,
-      "Europe/Amsterdam" -> 1543741872001234L)
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateTimeFormatter(
-        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
-        TimeZone.getTimeZone(timeZone),
-        Locale.US)
-      val microsSinceEpoch = formatter.parse(localDate)
-      assert(microsSinceEpoch === expectedMicros(timeZone))
-    }
-  }
-
-  test("format dates using time zones") {
-    val daysSinceEpoch = 17867
-    val expectedDate = Map(
-      "UTC" -> "2018-12-02",
-      "PST" -> "2018-12-01",
-      "CET" -> "2018-12-02",
-      "Africa/Dakar" -> "2018-12-02",
-      "America/Los_Angeles" -> "2018-12-01",
-      "Antarctica/Vostok" -> "2018-12-02",
-      "Asia/Hong_Kong" -> "2018-12-02",
-      "Europe/Amsterdam" -> "2018-12-02")
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
-      val date = formatter.format(daysSinceEpoch)
-      assert(date === expectedDate(timeZone))
-    }
-  }
-
-  test("format timestamps using time zones") {
-    val microsSinceEpoch = 1543745472001234L
-    val expectedTimestamp = Map(
-      "UTC" -> "2018-12-02T10:11:12.001234",
-      "PST" -> "2018-12-02T02:11:12.001234",
-      "CET" -> "2018-12-02T11:11:12.001234",
-      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
-      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
-      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
-      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
-      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateTimeFormatter(
-        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
-        TimeZone.getTimeZone(timeZone),
-        Locale.US)
-      val timestamp = formatter.format(microsSinceEpoch)
-      assert(timestamp === expectedTimestamp(timeZone))
-    }
-  }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
new file mode 100644
index 0000000..43e348c
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
+
+class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper {
+  test("parsing dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+        val daysSinceEpoch = formatter.parse("2018-12-02")
+        assert(daysSinceEpoch === 17867)
+      }
+    }
+  }
+
+  test("parsing timestamps using time zones") {
+    val localDate = "2018-12-02T10:11:12.001234"
+    val expectedMicros = Map(
+      "UTC" -> 1543745472001234L,
+      "PST" -> 1543774272001234L,
+      "CET" -> 1543741872001234L,
+      "Africa/Dakar" -> 1543745472001234L,
+      "America/Los_Angeles" -> 1543774272001234L,
+      "Antarctica/Vostok" -> 1543723872001234L,
+      "Asia/Hong_Kong" -> 1543716672001234L,
+      "Europe/Amsterdam" -> 1543741872001234L)
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val microsSinceEpoch = formatter.parse(localDate)
+      assert(microsSinceEpoch === expectedMicros(timeZone))
+    }
+  }
+
+  test("format dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+        val date = formatter.format(17867)
+        assert(date === "2018-12-02")
+      }
+    }
+  }
+
+  test("format timestamps using time zones") {
+    val microsSinceEpoch = 1543745472001234L
+    val expectedTimestamp = Map(
+      "UTC" -> "2018-12-02T10:11:12.001234",
+      "PST" -> "2018-12-02T02:11:12.001234",
+      "CET" -> "2018-12-02T11:11:12.001234",
+      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val timestamp = formatter.format(microsSinceEpoch)
+      assert(timestamp === expectedTimestamp(timeZone))
+    }
+  }
+
+  test("roundtrip timestamp -> micros -> timestamp using timezones") {
+    Seq(
+      -58710115316212000L,
+      -18926315945345679L,
+      -9463427405253013L,
+      -244000001L,
+      0L,
+      99628200102030L,
+      1543749753123456L,
+      2177456523456789L,
+      11858049903010203L).foreach { micros =>
+      DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+        val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+        val timestamp = formatter.format(micros)
+        val parsed = formatter.parse(timestamp)
+        assert(micros === parsed)
+      }
+    }
+  }
+
+  test("roundtrip micros -> timestamp -> micros using timezones") {
+    Seq(
+      "0109-07-20T18:38:03.788000",
+      "1370-04-01T10:00:54.654321",
+      "1670-02-11T14:09:54.746987",
+      "1969-12-31T23:55:55.999999",
+      "1970-01-01T00:00:00.000000",
+      "1973-02-27T02:30:00.102030",
+      "2018-12-02T11:22:33.123456",
+      "2039-01-01T01:02:03.456789",
+      "2345-10-07T22:45:03.010203").foreach { timestamp =>
+      DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+        val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+        val micros = formatter.parse(timestamp)
+        val formatted = formatter.format(micros)
+        assert(timestamp === formatted)
+      }
+    }
+  }
+
+  test("roundtrip date -> days -> date") {
+    Seq(
+      "0050-01-01",
+      "0953-02-02",
+      "1423-03-08",
+      "1969-12-31",
+      "1972-08-25",
+      "1975-09-26",
+      "2018-12-12",
+      "2038-01-01",
+      "5010-11-17").foreach { date =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+          val days = formatter.parse(date)
+          val formatted = formatter.format(days)
+          assert(date === formatted)
+        }
+      }
+    }
+  }
+
+  test("roundtrip days -> date -> days") {
+    Seq(
+      -701265,
+      -371419,
+      -199722,
+      -1,
+      0,
+      967,
+      2094,
+      17877,
+      24837,
+      1110657).foreach { days =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+          val date = formatter.format(days)
+          val parsed = formatter.parse(date)
+          assert(days === parsed)
+        }
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3330de3..786335b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
 
     val factory = new JsonFactory()
-    def enforceCorrectType(value: Any, dataType: DataType): Any = {
+    def enforceCorrectType(
+        value: Any,
+        dataType: DataType,
+        options: Map[String, String] = Map.empty): Any = {
       val writer = new StringWriter()
       Utils.tryWithResource(factory.createGenerator(writer)) { generator =>
         generator.writeObject(value)
         generator.flush()
       }
 
-      val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
+      val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
       val dummySchema = StructType(Seq.empty)
       val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
 
@@ -96,19 +99,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
         enforceCorrectType(intNumber.toLong, TimestampType))
     val strTime = "2014-09-30 12:34:56"
-    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
-        enforceCorrectType(strTime, TimestampType))
+    checkTypePromotion(
+      expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+      enforceCorrectType(strTime, TimestampType,
+        Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss")))
 
     val strDate = "2014-10-15"
     checkTypePromotion(
       DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
-    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
-        enforceCorrectType(ISO8601Time1, TimestampType))
+        enforceCorrectType(
+          ISO8601Time1,
+          TimestampType,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX")))
+    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
-        enforceCorrectType(ISO8601Time2, TimestampType))
+        enforceCorrectType(
+          ISO8601Time2,
+          TimestampType,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX")))
 
     val ISO8601Date = "1970-01-01"
     checkTypePromotion(DateTimeUtils.millisToDays(32400000),
@@ -1440,103 +1451,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("backward compatibility") {
-    // This test we make sure our JSON support can read JSON data generated by previous version
-    // of Spark generated through toJSON method and JSON data source.
-    // The data is generated by the following program.
-    // Here are a few notes:
-    //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
-    //      in the JSON object.
-    //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
-    //      JSON objects generated by those Spark versions (col17).
-    //  - If the type is NullType, we do not write data out.
-
-    // Create the schema.
-    val struct =
-      StructType(
-        StructField("f1", FloatType, true) ::
-          StructField("f2", ArrayType(BooleanType), true) :: Nil)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+      // This test we make sure our JSON support can read JSON data generated by previous version
+      // of Spark generated through toJSON method and JSON data source.
+      // The data is generated by the following program.
+      // Here are a few notes:
+      //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
+      //      in the JSON object.
+      //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
+      //      JSON objects generated by those Spark versions (col17).
+      //  - If the type is NullType, we do not write data out.
+
+      // Create the schema.
+      val struct =
+        StructType(
+          StructField("f1", FloatType, true) ::
+            StructField("f2", ArrayType(BooleanType), true) :: Nil)
 
-    val dataTypes =
-      Seq(
-        StringType, BinaryType, NullType, BooleanType,
-        ByteType, ShortType, IntegerType, LongType,
-        FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-        DateType, TimestampType,
-        ArrayType(IntegerType), MapType(StringType, LongType), struct,
-        new TestUDT.MyDenseVectorUDT())
-    val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-      StructField(s"col$index", dataType, nullable = true)
-    }
-    val schema = StructType(fields)
+      val dataTypes =
+        Seq(
+          StringType, BinaryType, NullType, BooleanType,
+          ByteType, ShortType, IntegerType, LongType,
+          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+          DateType, TimestampType,
+          ArrayType(IntegerType), MapType(StringType, LongType), struct,
+          new TestUDT.MyDenseVectorUDT())
+      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+        StructField(s"col$index", dataType, nullable = true)
+      }
+      val schema = StructType(fields)
 
-    val constantValues =
-      Seq(
-        "a string in binary".getBytes(StandardCharsets.UTF_8),
-        null,
-        true,
-        1.toByte,
-        2.toShort,
-        3,
-        Long.MaxValue,
-        0.25.toFloat,
-        0.75,
-        new java.math.BigDecimal(s"1234.23456"),
-        new java.math.BigDecimal(s"1.23456"),
-        java.sql.Date.valueOf("2015-01-01"),
-        java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
-        Seq(2, 3, 4),
-        Map("a string" -> 2000L),
-        Row(4.75.toFloat, Seq(false, true)),
-        new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
-    val data =
-      Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
+      val constantValues =
+        Seq(
+          "a string in binary".getBytes(StandardCharsets.UTF_8),
+          null,
+          true,
+          1.toByte,
+          2.toShort,
+          3,
+          Long.MaxValue,
+          0.25.toFloat,
+          0.75,
+          new java.math.BigDecimal(s"1234.23456"),
+          new java.math.BigDecimal(s"1.23456"),
+          java.sql.Date.valueOf("2015-01-01"),
+          java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
+          Seq(2, 3, 4),
+          Map("a string" -> 2000L),
+          Row(4.75.toFloat, Seq(false, true)),
+          new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
+      val data =
+        Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
 
-    // Data generated by previous versions.
-    // scalastyle:off
-    val existingJSONData =
+      // Data generated by previous versions.
+      // scalastyle:off
+      val existingJSONData =
       """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
-    // scalastyle:on
-
-    // Generate data for the current version.
-    val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
-    withTempPath { path =>
-      df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
+        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
+      // scalastyle:on
+
+      // Generate data for the current version.
+      val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
+      withTempPath { path =>
+        df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
 
-      // df.toJSON will convert internal rows to external rows first and then generate
-      // JSON objects. While, df.write.format("json") will write internal rows directly.
-      val allJSON =
+        // df.toJSON will convert internal rows to external rows first and then generate
+        // JSON objects. While, df.write.format("json") will write internal rows directly.
+        val allJSON =
         existingJSONData ++
           df.toJSON.collect() ++
           sparkContext.textFile(path.getCanonicalPath).collect()
 
-      Utils.deleteRecursively(path)
-      sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
-
-      // Read data back with the schema specified.
-      val col0Values =
-        Seq(
-          "Spark 1.2.2",
-          "Spark 1.3.1",
-          "Spark 1.3.1",
-          "Spark 1.4.1",
-          "Spark 1.4.1",
-          "Spark 1.5.0",
-          "Spark 1.5.0",
-          "Spark " + spark.sparkContext.version,
-          "Spark " + spark.sparkContext.version)
-      val expectedResult = col0Values.map { v =>
-        Row.fromSeq(Seq(v) ++ constantValues)
+        Utils.deleteRecursively(path)
+        sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
+
+        // Read data back with the schema specified.
+        val col0Values =
+          Seq(
+            "Spark 1.2.2",
+            "Spark 1.3.1",
+            "Spark 1.3.1",
+            "Spark 1.4.1",
+            "Spark 1.4.1",
+            "Spark 1.5.0",
+            "Spark 1.5.0",
+            "Spark " + spark.sparkContext.version,
+            "Spark " + spark.sparkContext.version)
+        val expectedResult = col0Values.map { v =>
+          Row.fromSeq(Seq(v) ++ constantValues)
+        }
+        checkAnswer(
+          spark.read.format("json").schema(schema).load(path.getCanonicalPath),
+          expectedResult
+        )
       }
-      checkAnswer(
-        spark.read.format("json").schema(schema).load(path.getCanonicalPath),
-        expectedResult
-      )
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 6075f2c..f0f62b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 
 import java.io.File
+import java.util.TimeZone
 
 import scala.util.Random
 
@@ -125,56 +126,62 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     } else {
       Seq(false)
     }
-    for (dataType <- supportedDataTypes) {
-      for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
-        val extraMessage = if (isParquetDataSource) {
-          s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
-        } else {
-          ""
-        }
-        logInfo(s"Testing $dataType data type$extraMessage")
-
-        val extraOptions = Map[String, String](
-          "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
-        )
-
-        withTempPath { file =>
-          val path = file.getCanonicalPath
-
-          val dataGenerator = RandomDataGenerator.forType(
-            dataType = dataType,
-            nullable = true,
-            new Random(System.nanoTime())
-          ).getOrElse {
-            fail(s"Failed to create data generator for schema $dataType")
+    // TODO: Support new parser too, see SPARK-26374.
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+      for (dataType <- supportedDataTypes) {
+        for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
+          val extraMessage = if (isParquetDataSource) {
+            s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
+          } else {
+            ""
+          }
+          logInfo(s"Testing $dataType data type$extraMessage")
+
+          val extraOptions = Map[String, String](
+            "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
+          )
+
+          withTempPath { file =>
+            val path = file.getCanonicalPath
+
+            val seed = System.nanoTime()
+            withClue(s"Random data generated with the seed: ${seed}") {
+              val dataGenerator = RandomDataGenerator.forType(
+                dataType = dataType,
+                nullable = true,
+                new Random(seed)
+              ).getOrElse {
+                fail(s"Failed to create data generator for schema $dataType")
+              }
+
+              // Create a DF for the schema with random data. The index field is used to sort the
+              // DataFrame.  This is a workaround for SPARK-10591.
+              val schema = new StructType()
+                .add("index", IntegerType, nullable = false)
+                .add("col", dataType, nullable = true)
+              val rdd =
+                spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+              val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+              df.write
+                .mode("overwrite")
+                .format(dataSourceName)
+                .option("dataSchema", df.schema.json)
+                .options(extraOptions)
+                .save(path)
+
+              val loadedDF = spark
+                .read
+                .format(dataSourceName)
+                .option("dataSchema", df.schema.json)
+                .schema(df.schema)
+                .options(extraOptions)
+                .load(path)
+                .orderBy("index")
+
+              checkAnswer(loadedDF, df)
+            }
           }
-
-          // Create a DF for the schema with random data. The index field is used to sort the
-          // DataFrame.  This is a workaround for SPARK-10591.
-          val schema = new StructType()
-            .add("index", IntegerType, nullable = false)
-            .add("col", dataType, nullable = true)
-          val rdd =
-            spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
-          val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
-          df.write
-            .mode("overwrite")
-            .format(dataSourceName)
-            .option("dataSchema", df.schema.json)
-            .options(extraOptions)
-            .save(path)
-
-          val loadedDF = spark
-            .read
-            .format(dataSourceName)
-            .option("dataSchema", df.schema.json)
-            .schema(df.schema)
-            .options(extraOptions)
-            .load(path)
-            .orderBy("index")
-
-          checkAnswer(loadedDF, df)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org