You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/19 03:47:17 UTC
[spark] branch branch-2.4 updated: [SPARK-26740][SQL][BRANCH-2.4]
Read timestamp/date column stats written by Spark 3.0
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 633de74 [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0
633de74 is described below
commit 633de74b60f2399a68dc6aa2c161dbb5568679e8
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Tue Feb 19 11:46:42 2019 +0800
[SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0
## What changes were proposed in this pull request?
- Backport of #23662 to `branch-2.4`
- Added `Timestamp`/`DateFormatter`
- Set version of column stats to `1` to keep backward compatibility with previous versions
## How was this patch tested?
The changes were tested by `StatisticsCollectionSuite` and by `StatisticsSuite`.
Closes #23809 from MaxGekk/column-stats-time-date-2.4.
Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/catalog/interface.scala | 32 ++++--
.../sql/catalyst/plans/logical/Statistics.scala | 7 +-
.../spark/sql/catalyst/util/DateFormatter.scala | 62 +++++++++++
.../catalyst/util/DateTimeFormatterHelper.scala | 78 ++++++++++++++
.../spark/sql/catalyst/util/DateTimeUtils.scala | 15 ++-
.../sql/catalyst/util/TimestampFormatter.scala | 87 +++++++++++++++
.../spark/sql/catalyst/plans/SQLHelper.scala | 64 +++++++++++
.../sql/catalyst/util/DateTimeTestUtils.scala | 11 ++
.../apache/spark/sql/util/DateFormatterSuite.scala | 98 +++++++++++++++++
.../spark/sql/util/TimestampFormatterSuite.scala | 120 +++++++++++++++++++++
10 files changed, 561 insertions(+), 13 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 30ded13..6453264 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -413,7 +413,8 @@ case class CatalogColumnStat(
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
- histogram: Option[Histogram] = None) {
+ histogram: Option[Histogram] = None,
+ version: Int = CatalogColumnStat.VERSION) {
/**
* Returns a map from string to string that can be used to serialize the column stats.
@@ -427,7 +428,7 @@ case class CatalogColumnStat(
*/
def toMap(colName: String): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
- map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+ map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", CatalogColumnStat.VERSION.toString)
distinctCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
}
@@ -450,12 +451,13 @@ case class CatalogColumnStat(
dataType: DataType): ColumnStat =
ColumnStat(
distinctCount = distinctCount,
- min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
- max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+ min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
+ max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
- histogram = histogram)
+ histogram = histogram,
+ version = version)
}
object CatalogColumnStat extends Logging {
@@ -470,14 +472,23 @@ object CatalogColumnStat extends Logging {
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"
+ val VERSION = 1
+
+ private def getTimestampFormatter(): TimestampFormatter = {
+ TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", timeZone = DateTimeUtils.TimeZoneUTC)
+ }
+
/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
- def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+ def fromExternalString(s: String, name: String, dataType: DataType, version: Int): Any = {
dataType match {
case BooleanType => s.toBoolean
- case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
- case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+ case DateType => DateFormatter().parse(s)
+ case TimestampType if version == 1 =>
+ DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case TimestampType => getTimestampFormatter().parse(s)
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
@@ -530,7 +541,8 @@ object CatalogColumnStat extends Logging {
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
- histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+ histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
+ version = map(s"${colName}.${KEY_VERSION}").toInt
))
} catch {
case NonFatal(e) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index b3a4886..d0ca9eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -93,6 +93,7 @@ case class Statistics(
* @param avgLen average length of the values. For fixed-length types, this should be a constant.
* @param maxLen maximum length of the values. For fixed-length types, this should be a constant.
* @param histogram histogram of the values
+ * @param version version of statistics saved to or retrieved from the catalog
*/
case class ColumnStat(
distinctCount: Option[BigInt] = None,
@@ -101,7 +102,8 @@ case class ColumnStat(
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
- histogram: Option[Histogram] = None) {
+ histogram: Option[Histogram] = None,
+ version: Int = CatalogColumnStat.VERSION) {
// Are distinctCount and nullCount statistics defined?
val hasCountStats = distinctCount.isDefined && nullCount.isDefined
@@ -120,7 +122,8 @@ case class ColumnStat(
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
- histogram = histogram)
+ histogram = histogram,
+ version = version)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
new file mode 100644
index 0000000..9535a36
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time.{Instant, ZoneId}
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToDays
+
+sealed trait DateFormatter extends Serializable {
+ def parse(s: String): Int // returns days since epoch
+ def format(days: Int): String
+}
+
+class Iso8601DateFormatter(
+ pattern: String,
+ locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
+
+ @transient
+ private lazy val formatter = getOrCreateFormatter(pattern, locale)
+ private val UTC = ZoneId.of("UTC")
+
+ private def toInstant(s: String): Instant = {
+ val temporalAccessor = formatter.parse(s)
+ toInstantWithZoneId(temporalAccessor, UTC)
+ }
+
+ override def parse(s: String): Int = instantToDays(toInstant(s))
+
+ override def format(days: Int): String = {
+ val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
+ formatter.withZone(UTC).format(instant)
+ }
+}
+
+object DateFormatter {
+ val defaultPattern: String = "yyyy-MM-dd"
+ val defaultLocale: Locale = Locale.US
+
+ def apply(format: String, locale: Locale): DateFormatter = {
+ new Iso8601DateFormatter(format, locale)
+ }
+
+ def apply(format: String): DateFormatter = apply(format, defaultLocale)
+
+ def apply(): DateFormatter = apply(defaultPattern)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
new file mode 100644
index 0000000..81ad6ad
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time._
+import java.time.chrono.IsoChronology
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
+import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
+import java.util.Locale
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
+
+trait DateTimeFormatterHelper {
+ protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor, zoneId: ZoneId): Instant = {
+ val localTime = if (temporalAccessor.query(TemporalQueries.localTime) == null) {
+ LocalTime.ofNanoOfDay(0)
+ } else {
+ LocalTime.from(temporalAccessor)
+ }
+ val localDate = LocalDate.from(temporalAccessor)
+ val localDateTime = LocalDateTime.of(localDate, localTime)
+ val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
+ Instant.from(zonedDateTime)
+ }
+
+ // Gets a formatter from the cache or creates new one. The buildFormatter method can be called
+ // a few times with the same parameters in parallel if the cache does not contain values
+ // associated to those parameters. Since the formatter is immutable, it does not matter.
+ // In this way, synchronised is intentionally omitted in this method to make parallel calls
+ // less synchronised.
+ // The Cache.get method is not used here to avoid creation of additional instances of Callable.
+ protected def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
+ val key = (pattern, locale)
+ var formatter = cache.getIfPresent(key)
+ if (formatter == null) {
+ formatter = buildFormatter(pattern, locale)
+ cache.put(key, formatter)
+ }
+ formatter
+ }
+}
+
+private object DateTimeFormatterHelper {
+ val cache = CacheBuilder.newBuilder()
+ .maximumSize(128)
+ .build[(String, Locale), DateTimeFormatter]()
+
+ def buildFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
+ new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .appendPattern(pattern)
+ .parseDefaulting(ChronoField.ERA, 1)
+ .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+ .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+ .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+ .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+ .toFormatter(locale)
+ .withChronology(IsoChronology.INSTANCE)
+ .withResolverStyle(ResolverStyle.STRICT)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 81d7274..f01a769 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
+import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
@@ -50,7 +51,7 @@ object DateTimeUtils {
final val MILLIS_PER_SECOND = 1000L
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
-
+ final val NANOS_PER_MICROS = 1000L
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
// number of days in 400 years
@@ -440,6 +441,18 @@ object DateTimeUtils {
Some(c.getTimeInMillis * 1000 + segments(6))
}
+ def instantToMicros(instant: Instant): Long = {
+ val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
+ val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS)
+ result
+ }
+
+ def instantToDays(instant: Instant): Int = {
+ val seconds = instant.getEpochSecond
+ val days = Math.floorDiv(seconds, SECONDS_PER_DAY)
+ days.toInt
+ }
+
/**
* Parses a given UTF8 date string to a corresponding [[Int]] value.
* The return type is [[Option]] in order to distinguish between 0 and null. The following
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
new file mode 100644
index 0000000..4ec61e1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.text.ParseException
+import java.time._
+import java.time.format.DateTimeParseException
+import java.time.temporal.TemporalQueries
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros
+
+sealed trait TimestampFormatter extends Serializable {
+ /**
+ * Parses a timestamp in a string and converts it to microseconds.
+ *
+ * @param s - string with timestamp to parse
+ * @return microseconds since epoch.
+ * @throws ParseException can be thrown by legacy parser
+ * @throws DateTimeParseException can be thrown by new parser
+ * @throws DateTimeException unable to obtain local date or time
+ */
+ @throws(classOf[ParseException])
+ @throws(classOf[DateTimeParseException])
+ @throws(classOf[DateTimeException])
+ def parse(s: String): Long
+ def format(us: Long): String
+}
+
+class Iso8601TimestampFormatter(
+ pattern: String,
+ timeZone: TimeZone,
+ locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
+ @transient
+ private lazy val formatter = getOrCreateFormatter(pattern, locale)
+
+ private def toInstant(s: String): Instant = {
+ val temporalAccessor = formatter.parse(s)
+ if (temporalAccessor.query(TemporalQueries.offset()) == null) {
+ toInstantWithZoneId(temporalAccessor, timeZone.toZoneId)
+ } else {
+ Instant.from(temporalAccessor)
+ }
+ }
+
+ override def parse(s: String): Long = instantToMicros(toInstant(s))
+
+ override def format(us: Long): String = {
+ val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
+ val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
+ val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS)
+
+ formatter.withZone(timeZone.toZoneId).format(instant)
+ }
+}
+
+object TimestampFormatter {
+ val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
+ val defaultLocale: Locale = Locale.US
+
+ def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
+ new Iso8601TimestampFormatter(format, timeZone, locale)
+ }
+
+ def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
+ apply(format, timeZone, defaultLocale)
+ }
+
+ def apply(timeZone: TimeZone): TimestampFormatter = {
+ apply(defaultPattern, timeZone, defaultLocale)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
new file mode 100644
index 0000000..4d869d7
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.plans
+
+import java.io.File
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+trait SQLHelper {
+
+ /**
+ * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL
+ * configurations.
+ */
+ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val conf = SQLConf.get
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map { key =>
+ if (conf.contains(key)) {
+ Some(conf.getConfString(key))
+ } else {
+ None
+ }
+ }
+ (keys, values).zipped.foreach { (k, v) =>
+ if (SQLConf.staticConfKeys.contains(k)) {
+ throw new AnalysisException(s"Cannot modify the value of a static config: $k")
+ }
+ conf.setConfString(k, v)
+ }
+ try f finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+
+ /**
+ * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
+ * a file/directory is created there by `f`, it will be delete after `f` returns.
+ */
+ protected def withTempPath(f: File => Unit): Unit = {
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
index 0c1feb3..66d8d28 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -26,6 +26,17 @@ object DateTimeTestUtils {
val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone)
+ val outstandingTimezonesIds: Seq[String] = Seq(
+ "UTC",
+ "PST",
+ "CET",
+ "Africa/Dakar",
+ "America/Los_Angeles",
+ "Antarctica/Vostok",
+ "Asia/Hong_Kong",
+ "Europe/Amsterdam")
+ val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone)
+
def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = {
val originalDefaultTimeZone = TimeZone.getDefault
try {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
new file mode 100644
index 0000000..602542f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.time.LocalDate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
+
+class DateFormatterSuite extends SparkFunSuite with SQLHelper {
+ test("parsing dates") {
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter()
+ val daysSinceEpoch = formatter.parse("2018-12-02")
+ assert(daysSinceEpoch === 17867)
+ }
+ }
+ }
+
+ test("format dates") {
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter()
+ val date = formatter.format(17867)
+ assert(date === "2018-12-02")
+ }
+ }
+ }
+
+ test("roundtrip date -> days -> date") {
+ Seq(
+ "0050-01-01",
+ "0953-02-02",
+ "1423-03-08",
+ "1969-12-31",
+ "1972-08-25",
+ "1975-09-26",
+ "2018-12-12",
+ "2038-01-01",
+ "5010-11-17").foreach { date =>
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter()
+ val days = formatter.parse(date)
+ val formatted = formatter.format(days)
+ assert(date === formatted)
+ }
+ }
+ }
+ }
+
+ test("roundtrip days -> date -> days") {
+ Seq(
+ -701265,
+ -371419,
+ -199722,
+ -1,
+ 0,
+ 967,
+ 2094,
+ 17877,
+ 24837,
+ 1110657).foreach { days =>
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val formatter = DateFormatter()
+ val date = formatter.format(days)
+ val parsed = formatter.parse(date)
+ assert(days === parsed)
+ }
+ }
+ }
+ }
+
+ test("parsing date without explicit day") {
+ val formatter = DateFormatter("yyyy MMM")
+ val daysSinceEpoch = formatter.parse("2018 Dec")
+ assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
new file mode 100644
index 0000000..192ca13
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, TimestampFormatter}
+
+class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
+
+ test("parsing timestamps using time zones") {
+ val localDate = "2018-12-02T10:11:12.001234"
+ val expectedMicros = Map(
+ "UTC" -> 1543745472001234L,
+ "PST" -> 1543774272001234L,
+ "CET" -> 1543741872001234L,
+ "Africa/Dakar" -> 1543745472001234L,
+ "America/Los_Angeles" -> 1543774272001234L,
+ "Antarctica/Vostok" -> 1543723872001234L,
+ "Asia/Hong_Kong" -> 1543716672001234L,
+ "Europe/Amsterdam" -> 1543741872001234L)
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ val formatter = TimestampFormatter(
+ "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+ TimeZone.getTimeZone(timeZone))
+ val microsSinceEpoch = formatter.parse(localDate)
+ assert(microsSinceEpoch === expectedMicros(timeZone))
+ }
+ }
+
+ test("format timestamps using time zones") {
+ val microsSinceEpoch = 1543745472001234L
+ val expectedTimestamp = Map(
+ "UTC" -> "2018-12-02T10:11:12.001234",
+ "PST" -> "2018-12-02T02:11:12.001234",
+ "CET" -> "2018-12-02T11:11:12.001234",
+ "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+ "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+ "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+ "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+ "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ val formatter = TimestampFormatter(
+ "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+ TimeZone.getTimeZone(timeZone))
+ val timestamp = formatter.format(microsSinceEpoch)
+ assert(timestamp === expectedTimestamp(timeZone))
+ }
+ }
+
+ test("roundtrip micros -> timestamp -> micros using timezones") {
+ Seq("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXXXX").foreach { pattern =>
+ Seq(
+ -58710115316212000L,
+ -18926315945345679L,
+ -9463427405253013L,
+ -244000001L,
+ 0L,
+ 99628200102030L,
+ 1543749753123456L,
+ 2177456523456789L,
+ 11858049903010203L).foreach { micros =>
+ DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+ val formatter = TimestampFormatter(pattern, timeZone)
+ val timestamp = formatter.format(micros)
+ val parsed = formatter.parse(timestamp)
+ assert(micros === parsed)
+ }
+ }
+ }
+ }
+
+ test("roundtrip timestamp -> micros -> timestamp using timezones") {
+ Seq(
+ "0109-07-20T18:38:03.788000",
+ "1370-04-01T10:00:54.654321",
+ "1670-02-11T14:09:54.746987",
+ "1969-12-31T23:55:55.999999",
+ "1970-01-01T00:00:00.000000",
+ "1973-02-27T02:30:00.102030",
+ "2018-12-02T11:22:33.123456",
+ "2039-01-01T01:02:03.456789",
+ "2345-10-07T22:45:03.010203").foreach { timestamp =>
+ DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+ val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
+ val micros = formatter.parse(timestamp)
+ val formatted = formatter.format(micros)
+ assert(timestamp === formatted)
+ }
+ }
+ }
+
+ test(" case insensitive parsing of am and pm") {
+ val formatter = TimestampFormatter(
+ "yyyy MMM dd hh:mm:ss a",
+ TimeZone.getTimeZone("UTC"))
+ val micros = formatter.parse("2009 Mar 20 11:30:01 am")
+ assert(micros === TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org