You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/19 03:47:17 UTC

[spark] branch branch-2.4 updated: [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 633de74  [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0
633de74 is described below

commit 633de74b60f2399a68dc6aa2c161dbb5568679e8
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Tue Feb 19 11:46:42 2019 +0800

    [SPARK-26740][SQL][BRANCH-2.4] Read timestamp/date column stats written by Spark 3.0
    
    ## What changes were proposed in this pull request?
    
    - Backport of #23662 to `branch-2.4`
    - Added `Timestamp`/`DateFormatter`
    - Set version of column stats to `1` to keep backward compatibility with previous versions
    
    ## How was this patch tested?
    
    The changes were tested by `StatisticsCollectionSuite` and by `StatisticsSuite`.
    
    Closes #23809 from MaxGekk/column-stats-time-date-2.4.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Maxim Gekk <ma...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/catalog/interface.scala     |  32 ++++--
 .../sql/catalyst/plans/logical/Statistics.scala    |   7 +-
 .../spark/sql/catalyst/util/DateFormatter.scala    |  62 +++++++++++
 .../catalyst/util/DateTimeFormatterHelper.scala    |  78 ++++++++++++++
 .../spark/sql/catalyst/util/DateTimeUtils.scala    |  15 ++-
 .../sql/catalyst/util/TimestampFormatter.scala     |  87 +++++++++++++++
 .../spark/sql/catalyst/plans/SQLHelper.scala       |  64 +++++++++++
 .../sql/catalyst/util/DateTimeTestUtils.scala      |  11 ++
 .../apache/spark/sql/util/DateFormatterSuite.scala |  98 +++++++++++++++++
 .../spark/sql/util/TimestampFormatterSuite.scala   | 120 +++++++++++++++++++++
 10 files changed, 561 insertions(+), 13 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 30ded13..6453264 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -413,7 +413,8 @@ case class CatalogColumnStat(
     nullCount: Option[BigInt] = None,
     avgLen: Option[Long] = None,
     maxLen: Option[Long] = None,
-    histogram: Option[Histogram] = None) {
+    histogram: Option[Histogram] = None,
+    version: Int = CatalogColumnStat.VERSION) {
 
   /**
    * Returns a map from string to string that can be used to serialize the column stats.
@@ -427,7 +428,7 @@ case class CatalogColumnStat(
    */
   def toMap(colName: String): Map[String, String] = {
     val map = new scala.collection.mutable.HashMap[String, String]
-    map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+    map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", CatalogColumnStat.VERSION.toString)
     distinctCount.foreach { v =>
       map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
     }
@@ -450,12 +451,13 @@ case class CatalogColumnStat(
       dataType: DataType): ColumnStat =
     ColumnStat(
       distinctCount = distinctCount,
-      min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
-      max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+      min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
+      max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
       nullCount = nullCount,
       avgLen = avgLen,
       maxLen = maxLen,
-      histogram = histogram)
+      histogram = histogram,
+      version = version)
 }
 
 object CatalogColumnStat extends Logging {
@@ -470,14 +472,23 @@ object CatalogColumnStat extends Logging {
   private val KEY_MAX_LEN = "maxLen"
   private val KEY_HISTOGRAM = "histogram"
 
+  val VERSION = 1
+
+  private def getTimestampFormatter(): TimestampFormatter = {
+    TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", timeZone = DateTimeUtils.TimeZoneUTC)
+  }
+
   /**
    * Converts from string representation of data type to the corresponding Catalyst data type.
    */
-  def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+  def fromExternalString(s: String, name: String, dataType: DataType, version: Int): Any = {
     dataType match {
       case BooleanType => s.toBoolean
-      case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
-      case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+      case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+      case DateType => DateFormatter().parse(s)
+      case TimestampType if version == 1 =>
+        DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+      case TimestampType => getTimestampFormatter().parse(s)
       case ByteType => s.toByte
       case ShortType => s.toShort
       case IntegerType => s.toInt
@@ -530,7 +541,8 @@ object CatalogColumnStat extends Logging {
         nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
         avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
         maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
-        histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+        histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
+        version = map(s"${colName}.${KEY_VERSION}").toInt
       ))
     } catch {
       case NonFatal(e) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index b3a4886..d0ca9eb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -93,6 +93,7 @@ case class Statistics(
  * @param avgLen average length of the values. For fixed-length types, this should be a constant.
  * @param maxLen maximum length of the values. For fixed-length types, this should be a constant.
  * @param histogram histogram of the values
+ * @param version version of statistics saved to or retrieved from the catalog
  */
 case class ColumnStat(
     distinctCount: Option[BigInt] = None,
@@ -101,7 +102,8 @@ case class ColumnStat(
     nullCount: Option[BigInt] = None,
     avgLen: Option[Long] = None,
     maxLen: Option[Long] = None,
-    histogram: Option[Histogram] = None) {
+    histogram: Option[Histogram] = None,
+    version: Int = CatalogColumnStat.VERSION) {
 
   // Are distinctCount and nullCount statistics defined?
   val hasCountStats = distinctCount.isDefined && nullCount.isDefined
@@ -120,7 +122,8 @@ case class ColumnStat(
       nullCount = nullCount,
       avgLen = avgLen,
       maxLen = maxLen,
-      histogram = histogram)
+      histogram = histogram,
+      version = version)
 }
 
 /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
new file mode 100644
index 0000000..9535a36
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time.{Instant, ZoneId}
+import java.util.Locale
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToDays
+
+sealed trait DateFormatter extends Serializable {
+  def parse(s: String): Int // returns days since epoch
+  def format(days: Int): String
+}
+
+class Iso8601DateFormatter(
+    pattern: String,
+    locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
+
+  @transient
+  private lazy val formatter = getOrCreateFormatter(pattern, locale)
+  private val UTC = ZoneId.of("UTC")
+
+  private def toInstant(s: String): Instant = {
+    val temporalAccessor = formatter.parse(s)
+    toInstantWithZoneId(temporalAccessor, UTC)
+  }
+
+  override def parse(s: String): Int = instantToDays(toInstant(s))
+
+  override def format(days: Int): String = {
+    val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
+    formatter.withZone(UTC).format(instant)
+  }
+}
+
+object DateFormatter {
+  val defaultPattern: String = "yyyy-MM-dd"
+  val defaultLocale: Locale = Locale.US
+
+  def apply(format: String, locale: Locale): DateFormatter = {
+    new Iso8601DateFormatter(format, locale)
+  }
+
+  def apply(format: String): DateFormatter = apply(format, defaultLocale)
+
+  def apply(): DateFormatter = apply(defaultPattern)
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
new file mode 100644
index 0000000..81ad6ad
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.time._
+import java.time.chrono.IsoChronology
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
+import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
+import java.util.Locale
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
+
+trait DateTimeFormatterHelper {
+  protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor, zoneId: ZoneId): Instant = {
+    val localTime = if (temporalAccessor.query(TemporalQueries.localTime) == null) {
+      LocalTime.ofNanoOfDay(0)
+    } else {
+      LocalTime.from(temporalAccessor)
+    }
+    val localDate = LocalDate.from(temporalAccessor)
+    val localDateTime = LocalDateTime.of(localDate, localTime)
+    val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
+    Instant.from(zonedDateTime)
+  }
+
+  // Gets a formatter from the cache or creates new one. The buildFormatter method can be called
+  // a few times with the same parameters in parallel if the cache does not contain values
+  // associated to those parameters. Since the formatter is immutable, it does not matter.
+  // In this way, synchronised is intentionally omitted in this method to make parallel calls
+  // less synchronised.
+  // The Cache.get method is not used here to avoid creation of additional instances of Callable.
+  protected def getOrCreateFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
+    val key = (pattern, locale)
+    var formatter = cache.getIfPresent(key)
+    if (formatter == null) {
+      formatter = buildFormatter(pattern, locale)
+      cache.put(key, formatter)
+    }
+    formatter
+  }
+}
+
+private object DateTimeFormatterHelper {
+  val cache = CacheBuilder.newBuilder()
+    .maximumSize(128)
+    .build[(String, Locale), DateTimeFormatter]()
+
+  def buildFormatter(pattern: String, locale: Locale): DateTimeFormatter = {
+    new DateTimeFormatterBuilder()
+      .parseCaseInsensitive()
+      .appendPattern(pattern)
+      .parseDefaulting(ChronoField.ERA, 1)
+      .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+      .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+      .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+      .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+      .toFormatter(locale)
+      .withChronology(IsoChronology.INSTANCE)
+      .withResolverStyle(ResolverStyle.STRICT)
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 81d7274..f01a769 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util
 
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
+import java.time.Instant
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.{Function => JFunction}
@@ -50,7 +51,7 @@ object DateTimeUtils {
   final val MILLIS_PER_SECOND = 1000L
   final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
   final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
-
+  final val NANOS_PER_MICROS = 1000L
   final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
 
   // number of days in 400 years
@@ -440,6 +441,18 @@ object DateTimeUtils {
     Some(c.getTimeInMillis * 1000 + segments(6))
   }
 
+  def instantToMicros(instant: Instant): Long = {
+    val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
+    val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS)
+    result
+  }
+
+  def instantToDays(instant: Instant): Int = {
+    val seconds = instant.getEpochSecond
+    val days = Math.floorDiv(seconds, SECONDS_PER_DAY)
+    days.toInt
+  }
+
   /**
    * Parses a given UTF8 date string to a corresponding [[Int]] value.
    * The return type is [[Option]] in order to distinguish between 0 and null. The following
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
new file mode 100644
index 0000000..4ec61e1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.text.ParseException
+import java.time._
+import java.time.format.DateTimeParseException
+import java.time.temporal.TemporalQueries
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros
+
+sealed trait TimestampFormatter extends Serializable {
+  /**
+   * Parses a timestamp in a string and converts it to microseconds.
+   *
+   * @param s - string with timestamp to parse
+   * @return microseconds since epoch.
+   * @throws ParseException can be thrown by legacy parser
+   * @throws DateTimeParseException can be thrown by new parser
+   * @throws DateTimeException unable to obtain local date or time
+   */
+  @throws(classOf[ParseException])
+  @throws(classOf[DateTimeParseException])
+  @throws(classOf[DateTimeException])
+  def parse(s: String): Long
+  def format(us: Long): String
+}
+
+class Iso8601TimestampFormatter(
+    pattern: String,
+    timeZone: TimeZone,
+    locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
+  @transient
+  private lazy val formatter = getOrCreateFormatter(pattern, locale)
+
+  private def toInstant(s: String): Instant = {
+    val temporalAccessor = formatter.parse(s)
+    if (temporalAccessor.query(TemporalQueries.offset()) == null) {
+      toInstantWithZoneId(temporalAccessor, timeZone.toZoneId)
+    } else {
+      Instant.from(temporalAccessor)
+    }
+  }
+
+  override def parse(s: String): Long = instantToMicros(toInstant(s))
+
+  override def format(us: Long): String = {
+    val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
+    val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
+    val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS)
+
+    formatter.withZone(timeZone.toZoneId).format(instant)
+  }
+}
+
+object TimestampFormatter {
+  val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
+  val defaultLocale: Locale = Locale.US
+
+  def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
+    new Iso8601TimestampFormatter(format, timeZone, locale)
+  }
+
+  def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
+    apply(format, timeZone, defaultLocale)
+  }
+
+  def apply(timeZone: TimeZone): TimestampFormatter = {
+    apply(defaultPattern, timeZone, defaultLocale)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
new file mode 100644
index 0000000..4d869d7
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/SQLHelper.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.plans
+
+import java.io.File
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+trait SQLHelper {
+
+  /**
+   * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL
+   * configurations.
+   */
+  protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val conf = SQLConf.get
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map { key =>
+      if (conf.contains(key)) {
+        Some(conf.getConfString(key))
+      } else {
+        None
+      }
+    }
+    (keys, values).zipped.foreach { (k, v) =>
+      if (SQLConf.staticConfKeys.contains(k)) {
+        throw new AnalysisException(s"Cannot modify the value of a static config: $k")
+      }
+      conf.setConfString(k, v)
+    }
+    try f finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
+
+  /**
+   * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
+   * a file/directory is created there by `f`, it will be delete after `f` returns.
+   */
+  protected def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
index 0c1feb3..66d8d28 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeTestUtils.scala
@@ -26,6 +26,17 @@ object DateTimeTestUtils {
 
   val ALL_TIMEZONES: Seq[TimeZone] = TimeZone.getAvailableIDs.toSeq.map(TimeZone.getTimeZone)
 
+  val outstandingTimezonesIds: Seq[String] = Seq(
+    "UTC",
+    "PST",
+    "CET",
+    "Africa/Dakar",
+    "America/Los_Angeles",
+    "Antarctica/Vostok",
+    "Asia/Hong_Kong",
+    "Europe/Amsterdam")
+  val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone)
+
   def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = {
     val originalDefaultTimeZone = TimeZone.getDefault
     try {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
new file mode 100644
index 0000000..602542f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.time.LocalDate
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
+
+class DateFormatterSuite extends SparkFunSuite with SQLHelper {
+  test("parsing dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter()
+        val daysSinceEpoch = formatter.parse("2018-12-02")
+        assert(daysSinceEpoch === 17867)
+      }
+    }
+  }
+
+  test("format dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter()
+        val date = formatter.format(17867)
+        assert(date === "2018-12-02")
+      }
+    }
+  }
+
+  test("roundtrip date -> days -> date") {
+    Seq(
+      "0050-01-01",
+      "0953-02-02",
+      "1423-03-08",
+      "1969-12-31",
+      "1972-08-25",
+      "1975-09-26",
+      "2018-12-12",
+      "2038-01-01",
+      "5010-11-17").foreach { date =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter()
+          val days = formatter.parse(date)
+          val formatted = formatter.format(days)
+          assert(date === formatted)
+        }
+      }
+    }
+  }
+
+  test("roundtrip days -> date -> days") {
+    Seq(
+      -701265,
+      -371419,
+      -199722,
+      -1,
+      0,
+      967,
+      2094,
+      17877,
+      24837,
+      1110657).foreach { days =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter()
+          val date = formatter.format(days)
+          val parsed = formatter.parse(date)
+          assert(days === parsed)
+        }
+      }
+    }
+  }
+
+  test("parsing date without explicit day") {
+    val formatter = DateFormatter("yyyy MMM")
+    val daysSinceEpoch = formatter.parse("2018 Dec")
+    assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
new file mode 100644
index 0000000..192ca13
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, TimestampFormatter}
+
+class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
+
+  test("parsing timestamps using time zones") {
+    val localDate = "2018-12-02T10:11:12.001234"
+    val expectedMicros = Map(
+      "UTC" -> 1543745472001234L,
+      "PST" -> 1543774272001234L,
+      "CET" -> 1543741872001234L,
+      "Africa/Dakar" -> 1543745472001234L,
+      "America/Los_Angeles" -> 1543774272001234L,
+      "Antarctica/Vostok" -> 1543723872001234L,
+      "Asia/Hong_Kong" -> 1543716672001234L,
+      "Europe/Amsterdam" -> 1543741872001234L)
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone))
+      val microsSinceEpoch = formatter.parse(localDate)
+      assert(microsSinceEpoch === expectedMicros(timeZone))
+    }
+  }
+
+  test("format timestamps using time zones") {
+    val microsSinceEpoch = 1543745472001234L
+    val expectedTimestamp = Map(
+      "UTC" -> "2018-12-02T10:11:12.001234",
+      "PST" -> "2018-12-02T02:11:12.001234",
+      "CET" -> "2018-12-02T11:11:12.001234",
+      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone))
+      val timestamp = formatter.format(microsSinceEpoch)
+      assert(timestamp === expectedTimestamp(timeZone))
+    }
+  }
+
+  test("roundtrip micros -> timestamp -> micros using timezones") {
+    Seq("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXXXX").foreach { pattern =>
+      Seq(
+        -58710115316212000L,
+        -18926315945345679L,
+        -9463427405253013L,
+        -244000001L,
+        0L,
+        99628200102030L,
+        1543749753123456L,
+        2177456523456789L,
+        11858049903010203L).foreach { micros =>
+        DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+          val formatter = TimestampFormatter(pattern, timeZone)
+          val timestamp = formatter.format(micros)
+          val parsed = formatter.parse(timestamp)
+          assert(micros === parsed)
+        }
+      }
+    }
+  }
+
+  test("roundtrip timestamp -> micros -> timestamp using timezones") {
+    Seq(
+      "0109-07-20T18:38:03.788000",
+      "1370-04-01T10:00:54.654321",
+      "1670-02-11T14:09:54.746987",
+      "1969-12-31T23:55:55.999999",
+      "1970-01-01T00:00:00.000000",
+      "1973-02-27T02:30:00.102030",
+      "2018-12-02T11:22:33.123456",
+      "2039-01-01T01:02:03.456789",
+      "2345-10-07T22:45:03.010203").foreach { timestamp =>
+      DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+        val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
+        val micros = formatter.parse(timestamp)
+        val formatted = formatter.format(micros)
+        assert(timestamp === formatted)
+      }
+    }
+  }
+
+  test(" case insensitive parsing of am and pm") {
+    val formatter = TimestampFormatter(
+      "yyyy MMM dd hh:mm:ss a",
+      TimeZone.getTimeZone("UTC"))
+    val micros = formatter.parse("2009 Mar 20 11:30:01 am")
+    assert(micros === TimeUnit.SECONDS.toMicros(
+      LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org