You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/16 01:32:34 UTC
[spark] branch master updated: [SPARK-26243][SQL] Use java.time API
for parsing timestamps and dates from JSON
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a27952 [SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON
8a27952 is described below
commit 8a27952cdbf492939d9bda59e2f516f574581636
Author: Maxim Gekk <ma...@databricks.com>
AuthorDate: Sun Dec 16 09:32:13 2018 +0800
[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON
## What changes were proposed in this pull request?
In the PR, I propose to switch on **java.time API** for parsing timestamps and dates from JSON inputs with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behavior with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates.
## How was this patch tested?
It was tested by `JsonExpressionsSuite`, `JsonFunctionsSuite` and `JsonSuite`.
Closes #23196 from MaxGekk/json-time-parser.
Lead-authored-by: Maxim Gekk <ma...@databricks.com>
Co-authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
docs/sql-migration-guide-upgrade.md | 2 +-
.../spark/sql/catalyst/csv/CSVInferSchema.scala | 6 +-
.../sql/catalyst/csv/UnivocityGenerator.scala | 8 +-
.../spark/sql/catalyst/csv/UnivocityParser.scala | 6 +-
.../spark/sql/catalyst/json/JSONOptions.scala | 10 +-
.../spark/sql/catalyst/json/JacksonGenerator.scala | 14 +-
.../spark/sql/catalyst/json/JacksonParser.scala | 35 +---
...imeFormatter.scala => TimestampFormatter.scala} | 93 ++++++----
.../spark/sql/util/DateTimeFormatterSuite.scala | 103 -----------
.../sql/util/DateTimestampFormatterSuite.scala | 174 ++++++++++++++++++
.../sql/execution/datasources/json/JsonSuite.scala | 201 +++++++++++----------
.../spark/sql/sources/HadoopFsRelationTest.scala | 105 ++++++-----
12 files changed, 422 insertions(+), 335 deletions(-)
diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index 8834e89..115fc65 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -35,7 +35,7 @@ displayTitle: Spark SQL Upgrading Guide
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
- - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
+ - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback [...]
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 345dc4d..35ade13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.util.DateTimeFormatter
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._
class CSVInferSchema(val options: CSVOptions) extends Serializable {
@transient
- private lazy val timeParser = DateTimeFormatter(
+ private lazy val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
@@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
- if ((allCatch opt timeParser.parse(field)).isDefined) {
+ if ((allCatch opt timestampParser.parse(field)).isDefined) {
TimestampType
} else {
tryParseBoolean(field)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index af09cd6..f012d96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.types._
class UnivocityGenerator(
@@ -41,18 +41,18 @@ class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray
- private val timeFormatter = DateTimeFormatter(
+ private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
- private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+ private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
case TimestampType =>
- (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
+ (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 0f375e0..ed08912 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -74,11 +74,11 @@ class UnivocityParser(
private val row = new GenericInternalRow(requiredSchema.length)
- private val timeFormatter = DateTimeFormatter(
+ private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
- private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+ private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
@@ -158,7 +158,7 @@ class UnivocityParser(
}
case _: TimestampType => (d: String) =>
- nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
+ nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)
case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index e10b8a3..eaff3fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
@@ -82,13 +81,10 @@ private[sql] class JSONOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
- // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
- val dateFormat: FastDateFormat =
- FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
+ val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
- val timestampFormat: FastDateFormat =
- FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
+ val timestampFormat: String =
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index d02a2be..951f519 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
/**
@@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(
private val lineSeparator: String = options.lineSeparatorInWrite
+ private val timestampFormatter = TimestampFormatter(
+ options.timestampFormat,
+ options.timeZone,
+ options.locale)
+ private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
@@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
- val timestampString =
- options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+ val timestampString = timestampFormatter.format(row.getLong(ordinal))
gen.writeString(timestampString)
case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
- val dateString =
- options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+ val dateString = dateFormatter.format(row.getInt(ordinal))
gen.writeString(dateString)
case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 7e3bd4d..3f245e1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -55,6 +55,12 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
+ private val timestampFormatter = TimestampFormatter(
+ options.timestampFormat,
+ options.timeZone,
+ options.locale)
+ private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
@@ -218,17 +224,7 @@ class JacksonParser(
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
- val stringValue = parser.getText
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- Long.box {
- Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
- .getOrElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- DateTimeUtils.stringToTime(stringValue).getTime * 1000L
- }
- }
+ timestampFormatter.parse(parser.getText)
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
@@ -237,22 +233,7 @@ class JacksonParser(
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
- val stringValue = parser.getText
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.x
- Int.box {
- Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
- .orElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
- }
- .getOrElse {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
- }
+ dateFormatter.parse(parser.getText)
}
case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
similarity index 63%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index ad1f413..2b8d22d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
import java.time._
import java.time.format.DateTimeFormatterBuilder
-import java.time.temporal.{ChronoField, TemporalQueries}
+import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.{Locale, TimeZone}
import scala.util.Try
@@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.internal.SQLConf
-sealed trait DateTimeFormatter {
+sealed trait TimestampFormatter {
def parse(s: String): Long // returns microseconds since epoch
def format(us: Long): String
}
-class Iso8601DateTimeFormatter(
+trait FormatterUtils {
+ protected def zoneId: ZoneId
+ protected def buildFormatter(
+ pattern: String,
+ locale: Locale): java.time.format.DateTimeFormatter = {
+ new DateTimeFormatterBuilder()
+ .appendPattern(pattern)
+ .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
+ .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+ .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+ .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+ .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+ .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+ .toFormatter(locale)
+ }
+ protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
+ val localDateTime = LocalDateTime.from(temporalAccessor)
+ val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
+ Instant.from(zonedDateTime)
+ }
+}
+
+class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
- locale: Locale) extends DateTimeFormatter {
- val formatter = new DateTimeFormatterBuilder()
- .appendPattern(pattern)
- .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
- .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
- .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
- .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
- .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
- .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
- .toFormatter(locale)
+ locale: Locale) extends TimestampFormatter with FormatterUtils {
+ val zoneId = timeZone.toZoneId
+ val formatter = buildFormatter(pattern, locale)
def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
- val localDateTime = LocalDateTime.from(temporalAccessor)
- val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
- Instant.from(zonedDateTime)
+ toInstantWithZoneId(temporalAccessor)
} else {
Instant.from(temporalAccessor)
}
@@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
}
}
-class LegacyDateTimeFormatter(
+class LegacyTimestampFormatter(
pattern: String,
timeZone: TimeZone,
- locale: Locale) extends DateTimeFormatter {
+ locale: Locale) extends TimestampFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
protected def toMillis(s: String): Long = format.parse(s).getTime
@@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
}
}
-class LegacyFallbackDateTimeFormatter(
+class LegacyFallbackTimestampFormatter(
pattern: String,
timeZone: TimeZone,
- locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
+ locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
override def toMillis(s: String): Long = {
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
}
}
-object DateTimeFormatter {
- def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
+object TimestampFormatter {
+ def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
- new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
+ new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
- new Iso8601DateTimeFormatter(format, timeZone, locale)
+ new Iso8601TimestampFormatter(format, timeZone, locale)
}
}
}
@@ -116,13 +129,19 @@ sealed trait DateFormatter {
class Iso8601DateFormatter(
pattern: String,
- timeZone: TimeZone,
- locale: Locale) extends DateFormatter {
+ locale: Locale) extends DateFormatter with FormatterUtils {
+
+ val zoneId = ZoneId.of("UTC")
+
+ val formatter = buildFormatter(pattern, locale)
- val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
+ def toInstant(s: String): Instant = {
+ val temporalAccessor = formatter.parse(s)
+ toInstantWithZoneId(temporalAccessor)
+ }
override def parse(s: String): Int = {
- val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
+ val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
days.toInt
@@ -130,15 +149,12 @@ class Iso8601DateFormatter(
override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
- dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
+ formatter.withZone(zoneId).format(instant)
}
}
-class LegacyDateFormatter(
- pattern: String,
- timeZone: TimeZone,
- locale: Locale) extends DateFormatter {
- val format = FastDateFormat.getInstance(pattern, timeZone, locale)
+class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
+ val format = FastDateFormat.getInstance(pattern, locale)
def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
@@ -153,8 +169,7 @@ class LegacyDateFormatter(
class LegacyFallbackDateFormatter(
pattern: String,
- timeZone: TimeZone,
- locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
+ locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
@@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
}
object DateFormatter {
- def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
+ def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
- new LegacyFallbackDateFormatter(format, timeZone, locale)
+ new LegacyFallbackDateFormatter(format, locale)
} else {
- new Iso8601DateFormatter(format, timeZone, locale)
+ new Iso8601DateFormatter(format, locale)
}
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
deleted file mode 100644
index 02d4ee0..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import java.util.{Locale, TimeZone}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils}
-
-class DateTimeFormatterSuite extends SparkFunSuite {
- test("parsing dates using time zones") {
- val localDate = "2018-12-02"
- val expectedDays = Map(
- "UTC" -> 17867,
- "PST" -> 17867,
- "CET" -> 17866,
- "Africa/Dakar" -> 17867,
- "America/Los_Angeles" -> 17867,
- "Antarctica/Vostok" -> 17866,
- "Asia/Hong_Kong" -> 17866,
- "Europe/Amsterdam" -> 17866)
- DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
- val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
- val daysSinceEpoch = formatter.parse(localDate)
- assert(daysSinceEpoch === expectedDays(timeZone))
- }
- }
-
- test("parsing timestamps using time zones") {
- val localDate = "2018-12-02T10:11:12.001234"
- val expectedMicros = Map(
- "UTC" -> 1543745472001234L,
- "PST" -> 1543774272001234L,
- "CET" -> 1543741872001234L,
- "Africa/Dakar" -> 1543745472001234L,
- "America/Los_Angeles" -> 1543774272001234L,
- "Antarctica/Vostok" -> 1543723872001234L,
- "Asia/Hong_Kong" -> 1543716672001234L,
- "Europe/Amsterdam" -> 1543741872001234L)
- DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
- val formatter = DateTimeFormatter(
- "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
- TimeZone.getTimeZone(timeZone),
- Locale.US)
- val microsSinceEpoch = formatter.parse(localDate)
- assert(microsSinceEpoch === expectedMicros(timeZone))
- }
- }
-
- test("format dates using time zones") {
- val daysSinceEpoch = 17867
- val expectedDate = Map(
- "UTC" -> "2018-12-02",
- "PST" -> "2018-12-01",
- "CET" -> "2018-12-02",
- "Africa/Dakar" -> "2018-12-02",
- "America/Los_Angeles" -> "2018-12-01",
- "Antarctica/Vostok" -> "2018-12-02",
- "Asia/Hong_Kong" -> "2018-12-02",
- "Europe/Amsterdam" -> "2018-12-02")
- DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
- val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
- val date = formatter.format(daysSinceEpoch)
- assert(date === expectedDate(timeZone))
- }
- }
-
- test("format timestamps using time zones") {
- val microsSinceEpoch = 1543745472001234L
- val expectedTimestamp = Map(
- "UTC" -> "2018-12-02T10:11:12.001234",
- "PST" -> "2018-12-02T02:11:12.001234",
- "CET" -> "2018-12-02T11:11:12.001234",
- "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
- "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
- "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
- "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
- "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
- DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
- val formatter = DateTimeFormatter(
- "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
- TimeZone.getTimeZone(timeZone),
- Locale.US)
- val timestamp = formatter.format(microsSinceEpoch)
- assert(timestamp === expectedTimestamp(timeZone))
- }
- }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
new file mode 100644
index 0000000..43e348c
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
+
+class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper {
+ test("parsing dates") {
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val daysSinceEpoch = formatter.parse("2018-12-02")
+ assert(daysSinceEpoch === 17867)
+ }
+ }
+ }
+
+ test("parsing timestamps using time zones") {
+ val localDate = "2018-12-02T10:11:12.001234"
+ val expectedMicros = Map(
+ "UTC" -> 1543745472001234L,
+ "PST" -> 1543774272001234L,
+ "CET" -> 1543741872001234L,
+ "Africa/Dakar" -> 1543745472001234L,
+ "America/Los_Angeles" -> 1543774272001234L,
+ "Antarctica/Vostok" -> 1543723872001234L,
+ "Asia/Hong_Kong" -> 1543716672001234L,
+ "Europe/Amsterdam" -> 1543741872001234L)
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ val formatter = TimestampFormatter(
+ "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+ TimeZone.getTimeZone(timeZone),
+ Locale.US)
+ val microsSinceEpoch = formatter.parse(localDate)
+ assert(microsSinceEpoch === expectedMicros(timeZone))
+ }
+ }
+
+ test("format dates") {
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val date = formatter.format(17867)
+ assert(date === "2018-12-02")
+ }
+ }
+ }
+
+ test("format timestamps using time zones") {
+ val microsSinceEpoch = 1543745472001234L
+ val expectedTimestamp = Map(
+ "UTC" -> "2018-12-02T10:11:12.001234",
+ "PST" -> "2018-12-02T02:11:12.001234",
+ "CET" -> "2018-12-02T11:11:12.001234",
+ "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+ "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+ "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+ "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+ "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ val formatter = TimestampFormatter(
+ "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+ TimeZone.getTimeZone(timeZone),
+ Locale.US)
+ val timestamp = formatter.format(microsSinceEpoch)
+ assert(timestamp === expectedTimestamp(timeZone))
+ }
+ }
+
+ test("roundtrip timestamp -> micros -> timestamp using timezones") {
+ Seq(
+ -58710115316212000L,
+ -18926315945345679L,
+ -9463427405253013L,
+ -244000001L,
+ 0L,
+ 99628200102030L,
+ 1543749753123456L,
+ 2177456523456789L,
+ 11858049903010203L).foreach { micros =>
+ DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+ val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+ val timestamp = formatter.format(micros)
+ val parsed = formatter.parse(timestamp)
+ assert(micros === parsed)
+ }
+ }
+ }
+
+ test("roundtrip micros -> timestamp -> micros using timezones") {
+ Seq(
+ "0109-07-20T18:38:03.788000",
+ "1370-04-01T10:00:54.654321",
+ "1670-02-11T14:09:54.746987",
+ "1969-12-31T23:55:55.999999",
+ "1970-01-01T00:00:00.000000",
+ "1973-02-27T02:30:00.102030",
+ "2018-12-02T11:22:33.123456",
+ "2039-01-01T01:02:03.456789",
+ "2345-10-07T22:45:03.010203").foreach { timestamp =>
+ DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+ val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+ val micros = formatter.parse(timestamp)
+ val formatted = formatter.format(micros)
+ assert(timestamp === formatted)
+ }
+ }
+ }
+
+ test("roundtrip date -> days -> date") {
+ Seq(
+ "0050-01-01",
+ "0953-02-02",
+ "1423-03-08",
+ "1969-12-31",
+ "1972-08-25",
+ "1975-09-26",
+ "2018-12-12",
+ "2038-01-01",
+ "5010-11-17").foreach { date =>
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val days = formatter.parse(date)
+ val formatted = formatter.format(days)
+ assert(date === formatted)
+ }
+ }
+ }
+ }
+
+ test("roundtrip days -> date -> days") {
+ Seq(
+ -701265,
+ -371419,
+ -199722,
+ -1,
+ 0,
+ 967,
+ 2094,
+ 17877,
+ 24837,
+ 1110657).foreach { days =>
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+ val date = formatter.format(days)
+ val parsed = formatter.parse(date)
+ assert(days === parsed)
+ }
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3330de3..786335b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
val factory = new JsonFactory()
- def enforceCorrectType(value: Any, dataType: DataType): Any = {
+ def enforceCorrectType(
+ value: Any,
+ dataType: DataType,
+ options: Map[String, String] = Map.empty): Any = {
val writer = new StringWriter()
Utils.tryWithResource(factory.createGenerator(writer)) { generator =>
generator.writeObject(value)
generator.flush()
}
- val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
+ val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
@@ -96,19 +99,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
enforceCorrectType(intNumber.toLong, TimestampType))
val strTime = "2014-09-30 12:34:56"
- checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
- enforceCorrectType(strTime, TimestampType))
+ checkTypePromotion(
+ expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+ enforceCorrectType(strTime, TimestampType,
+ Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss")))
val strDate = "2014-10-15"
checkTypePromotion(
DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
- val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
- enforceCorrectType(ISO8601Time1, TimestampType))
+ enforceCorrectType(
+ ISO8601Time1,
+ TimestampType,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX")))
+ val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
- enforceCorrectType(ISO8601Time2, TimestampType))
+ enforceCorrectType(
+ ISO8601Time2,
+ TimestampType,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX")))
val ISO8601Date = "1970-01-01"
checkTypePromotion(DateTimeUtils.millisToDays(32400000),
@@ -1440,103 +1451,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("backward compatibility") {
- // This test we make sure our JSON support can read JSON data generated by previous version
- // of Spark generated through toJSON method and JSON data source.
- // The data is generated by the following program.
- // Here are a few notes:
- // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
- // in the JSON object.
- // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
- // JSON objects generated by those Spark versions (col17).
- // - If the type is NullType, we do not write data out.
-
- // Create the schema.
- val struct =
- StructType(
- StructField("f1", FloatType, true) ::
- StructField("f2", ArrayType(BooleanType), true) :: Nil)
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+ // This test we make sure our JSON support can read JSON data generated by previous version
+ // of Spark generated through toJSON method and JSON data source.
+ // The data is generated by the following program.
+ // Here are a few notes:
+ // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
+ // in the JSON object.
+ // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
+ // JSON objects generated by those Spark versions (col17).
+ // - If the type is NullType, we do not write data out.
+
+ // Create the schema.
+ val struct =
+ StructType(
+ StructField("f1", FloatType, true) ::
+ StructField("f2", ArrayType(BooleanType), true) :: Nil)
- val dataTypes =
- Seq(
- StringType, BinaryType, NullType, BooleanType,
- ByteType, ShortType, IntegerType, LongType,
- FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
- DateType, TimestampType,
- ArrayType(IntegerType), MapType(StringType, LongType), struct,
- new TestUDT.MyDenseVectorUDT())
- val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
- StructField(s"col$index", dataType, nullable = true)
- }
- val schema = StructType(fields)
+ val dataTypes =
+ Seq(
+ StringType, BinaryType, NullType, BooleanType,
+ ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+ DateType, TimestampType,
+ ArrayType(IntegerType), MapType(StringType, LongType), struct,
+ new TestUDT.MyDenseVectorUDT())
+ val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+ StructField(s"col$index", dataType, nullable = true)
+ }
+ val schema = StructType(fields)
- val constantValues =
- Seq(
- "a string in binary".getBytes(StandardCharsets.UTF_8),
- null,
- true,
- 1.toByte,
- 2.toShort,
- 3,
- Long.MaxValue,
- 0.25.toFloat,
- 0.75,
- new java.math.BigDecimal(s"1234.23456"),
- new java.math.BigDecimal(s"1.23456"),
- java.sql.Date.valueOf("2015-01-01"),
- java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
- Seq(2, 3, 4),
- Map("a string" -> 2000L),
- Row(4.75.toFloat, Seq(false, true)),
- new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
- val data =
- Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
+ val constantValues =
+ Seq(
+ "a string in binary".getBytes(StandardCharsets.UTF_8),
+ null,
+ true,
+ 1.toByte,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75,
+ new java.math.BigDecimal(s"1234.23456"),
+ new java.math.BigDecimal(s"1.23456"),
+ java.sql.Date.valueOf("2015-01-01"),
+ java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
+ Seq(2, 3, 4),
+ Map("a string" -> 2000L),
+ Row(4.75.toFloat, Seq(false, true)),
+ new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
+ val data =
+ Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
- // Data generated by previous versions.
- // scalastyle:off
- val existingJSONData =
+ // Data generated by previous versions.
+ // scalastyle:off
+ val existingJSONData =
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
- """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
- // scalastyle:on
-
- // Generate data for the current version.
- val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
- withTempPath { path =>
- df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
+ """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
+ // scalastyle:on
+
+ // Generate data for the current version.
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
+ withTempPath { path =>
+ df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
- // df.toJSON will convert internal rows to external rows first and then generate
- // JSON objects. While, df.write.format("json") will write internal rows directly.
- val allJSON =
+ // df.toJSON will convert internal rows to external rows first and then generate
+ // JSON objects. While, df.write.format("json") will write internal rows directly.
+ val allJSON =
existingJSONData ++
df.toJSON.collect() ++
sparkContext.textFile(path.getCanonicalPath).collect()
- Utils.deleteRecursively(path)
- sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
-
- // Read data back with the schema specified.
- val col0Values =
- Seq(
- "Spark 1.2.2",
- "Spark 1.3.1",
- "Spark 1.3.1",
- "Spark 1.4.1",
- "Spark 1.4.1",
- "Spark 1.5.0",
- "Spark 1.5.0",
- "Spark " + spark.sparkContext.version,
- "Spark " + spark.sparkContext.version)
- val expectedResult = col0Values.map { v =>
- Row.fromSeq(Seq(v) ++ constantValues)
+ Utils.deleteRecursively(path)
+ sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
+
+ // Read data back with the schema specified.
+ val col0Values =
+ Seq(
+ "Spark 1.2.2",
+ "Spark 1.3.1",
+ "Spark 1.3.1",
+ "Spark 1.4.1",
+ "Spark 1.4.1",
+ "Spark 1.5.0",
+ "Spark 1.5.0",
+ "Spark " + spark.sparkContext.version,
+ "Spark " + spark.sparkContext.version)
+ val expectedResult = col0Values.map { v =>
+ Row.fromSeq(Seq(v) ++ constantValues)
+ }
+ checkAnswer(
+ spark.read.format("json").schema(schema).load(path.getCanonicalPath),
+ expectedResult
+ )
}
- checkAnswer(
- spark.read.format("json").schema(schema).load(path.getCanonicalPath),
- expectedResult
- )
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 6075f2c..f0f62b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources
import java.io.File
+import java.util.TimeZone
import scala.util.Random
@@ -125,56 +126,62 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
} else {
Seq(false)
}
- for (dataType <- supportedDataTypes) {
- for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
- val extraMessage = if (isParquetDataSource) {
- s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
- } else {
- ""
- }
- logInfo(s"Testing $dataType data type$extraMessage")
-
- val extraOptions = Map[String, String](
- "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
- )
-
- withTempPath { file =>
- val path = file.getCanonicalPath
-
- val dataGenerator = RandomDataGenerator.forType(
- dataType = dataType,
- nullable = true,
- new Random(System.nanoTime())
- ).getOrElse {
- fail(s"Failed to create data generator for schema $dataType")
+ // TODO: Support new parser too, see SPARK-26374.
+ withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+ for (dataType <- supportedDataTypes) {
+ for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
+ val extraMessage = if (isParquetDataSource) {
+ s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
+ } else {
+ ""
+ }
+ logInfo(s"Testing $dataType data type$extraMessage")
+
+ val extraOptions = Map[String, String](
+ "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
+ )
+
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val seed = System.nanoTime()
+ withClue(s"Random data generated with the seed: ${seed}") {
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
+ nullable = true,
+ new Random(seed)
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
+ }
+
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd =
+ spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .save(path)
+
+ val loadedDF = spark
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .options(extraOptions)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
+ }
}
-
- // Create a DF for the schema with random data. The index field is used to sort the
- // DataFrame. This is a workaround for SPARK-10591.
- val schema = new StructType()
- .add("index", IntegerType, nullable = false)
- .add("col", dataType, nullable = true)
- val rdd =
- spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
- val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
- df.write
- .mode("overwrite")
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .options(extraOptions)
- .save(path)
-
- val loadedDF = spark
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .schema(df.schema)
- .options(extraOptions)
- .load(path)
- .orderBy("index")
-
- checkAnswer(loadedDF, df)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org