You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/21 04:04:31 UTC

[spark] branch branch-3.0 updated: [SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b4df7b5  [SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString
b4df7b5 is described below

commit b4df7b57be33baee8fea98eb2ff85fe77b9aeef0
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu May 21 04:01:19 2020 +0000

    [SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString
    
    ### What changes were proposed in this pull request?
    1. Add new methods that accept date-time Java types to the DateFormatter and TimestampFormatter traits. The methods format input date-time instances to strings:
        - TimestampFormatter:
          - `def format(ts: Timestamp): String`
          - `def format(instant: Instant): String`
        - DateFormatter:
          - `def format(date: Date): String`
          - `def format(localDate: LocalDate): String`
    2. Re-use the added methods from `HiveResult.toHiveString`
    3. Borrow the code for formatting of `java.sql.Timestamp` from Spark 2.4 `DateTimeUtils.timestampToString` to `FractionTimestampFormatter` because legacy formatters don't support variable length patterns for seconds fractions.
    
    ### Why are the changes needed?
    To avoid unnecessary overhead of converting Java date-time types to micros/days before formatting. Also formatters have to convert input micros/days back to Java types to pass instances to standard library API.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By existing tests for toHiveString and new tests in `TimestampFormatterSuite`.
    
    Closes #28582 from MaxGekk/opt-format-old-types.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5d673319af81bb826e5f532b5ff25f2d4b2da122)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/util/DateFormatter.scala    | 25 +++++--
 .../sql/catalyst/util/TimestampFormatter.scala     | 52 ++++++++++++--
 .../apache/spark/sql/util/DateFormatterSuite.scala | 16 +++--
 .../spark/sql/util/TimestampFormatterSuite.scala   | 84 ++++++++++++++--------
 .../apache/spark/sql/execution/HiveResult.scala    | 11 ++-
 .../sql-tests/results/postgreSQL/date.sql.out      |  2 +-
 6 files changed, 138 insertions(+), 52 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index 0f79c1a..7d94955 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -29,7 +29,10 @@ import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 
 sealed trait DateFormatter extends Serializable {
   def parse(s: String): Int // returns days since epoch
+
   def format(days: Int): String
+  def format(date: Date): String
+  def format(localDate: LocalDate): String
 }
 
 class Iso8601DateFormatter(
@@ -56,22 +59,32 @@ class Iso8601DateFormatter(
     }
   }
 
+  override def format(localDate: LocalDate): String = {
+    localDate.format(formatter)
+  }
+
   override def format(days: Int): String = {
-    LocalDate.ofEpochDay(days).format(formatter)
+    format(LocalDate.ofEpochDay(days))
+  }
+
+  override def format(date: Date): String = {
+    legacyFormatter.format(date)
   }
 }
 
 trait LegacyDateFormatter extends DateFormatter {
   def parseToDate(s: String): Date
-  def formatDate(d: Date): String
 
   override def parse(s: String): Int = {
     fromJavaDate(new java.sql.Date(parseToDate(s).getTime))
   }
 
   override def format(days: Int): String = {
-    val date = DateTimeUtils.toJavaDate(days)
-    formatDate(date)
+    format(DateTimeUtils.toJavaDate(days))
+  }
+
+  override def format(localDate: LocalDate): String = {
+    format(localDateToDays(localDate))
   }
 }
 
@@ -79,14 +92,14 @@ class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDat
   @transient
   private lazy val fdf = FastDateFormat.getInstance(pattern, locale)
   override def parseToDate(s: String): Date = fdf.parse(s)
-  override def formatDate(d: Date): String = fdf.format(d)
+  override def format(d: Date): String = fdf.format(d)
 }
 
 class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
   @transient
   private lazy val sdf = new SimpleDateFormat(pattern, locale)
   override def parseToDate(s: String): Date = sdf.parse(s)
-  override def formatDate(d: Date): String = sdf.format(d)
+  override def format(d: Date): String = sdf.format(d)
 }
 
 object DateFormatter {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 47d6b47..d7d6368 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -50,7 +50,10 @@ sealed trait TimestampFormatter extends Serializable {
   @throws(classOf[DateTimeParseException])
   @throws(classOf[DateTimeException])
   def parse(s: String): Long
+
   def format(us: Long): String
+  def format(ts: Timestamp): String
+  def format(instant: Instant): String
 }
 
 class Iso8601TimestampFormatter(
@@ -84,9 +87,17 @@ class Iso8601TimestampFormatter(
     }
   }
 
+  override def format(instant: Instant): String = {
+    formatter.withZone(zoneId).format(instant)
+  }
+
   override def format(us: Long): String = {
     val instant = DateTimeUtils.microsToInstant(us)
-    formatter.withZone(zoneId).format(instant)
+    format(instant)
+  }
+
+  override def format(ts: Timestamp): String = {
+    legacyFormatter.format(ts)
   }
 }
 
@@ -100,10 +111,27 @@ class Iso8601TimestampFormatter(
  */
 class FractionTimestampFormatter(zoneId: ZoneId)
   extends Iso8601TimestampFormatter(
-    "", zoneId, TimestampFormatter.defaultLocale, needVarLengthSecondFraction = false) {
+    TimestampFormatter.defaultPattern,
+    zoneId,
+    TimestampFormatter.defaultLocale,
+    needVarLengthSecondFraction = false) {
 
   @transient
   override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter
+
+  // The new formatter will omit the trailing 0 in the timestamp string, but the legacy formatter
+  // can't. Here we borrow the code from Spark 2.4 DateTimeUtils.timestampToString to omit the
+  // trailing 0 for the legacy formatter as well.
+  override def format(ts: Timestamp): String = {
+    val timestampString = ts.toString
+    val formatted = legacyFormatter.format(ts)
+
+    if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
+      formatted + timestampString.substring(19)
+    } else {
+      formatted
+    }
+  }
 }
 
 /**
@@ -149,7 +177,7 @@ class LegacyFastTimestampFormatter(
     fastDateFormat.getTimeZone,
     fastDateFormat.getPattern.count(_ == 'S'))
 
-  def parse(s: String): SQLTimestamp = {
+  override def parse(s: String): SQLTimestamp = {
     cal.clear() // Clear the calendar because it can be re-used many times
     if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
       throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
@@ -160,12 +188,20 @@ class LegacyFastTimestampFormatter(
     rebaseJulianToGregorianMicros(julianMicros)
   }
 
-  def format(timestamp: SQLTimestamp): String = {
+  override def format(timestamp: SQLTimestamp): String = {
     val julianMicros = rebaseGregorianToJulianMicros(timestamp)
     cal.setTimeInMillis(Math.floorDiv(julianMicros, MICROS_PER_SECOND) * MILLIS_PER_SECOND)
     cal.setMicros(Math.floorMod(julianMicros, MICROS_PER_SECOND))
     fastDateFormat.format(cal)
   }
+
+  override def format(ts: Timestamp): String = {
+    format(fromJavaTimestamp(ts))
+  }
+
+  override def format(instant: Instant): String = {
+    format(instantToMicros(instant))
+  }
 }
 
 class LegacySimpleTimestampFormatter(
@@ -187,6 +223,14 @@ class LegacySimpleTimestampFormatter(
   override def format(us: Long): String = {
     sdf.format(toJavaTimestamp(us))
   }
+
+  override def format(ts: Timestamp): String = {
+    sdf.format(ts)
+  }
+
+  override def format(instant: Instant): String = {
+    format(instantToMicros(instant))
+  }
 }
 
 object LegacyDateFormats extends Enumeration {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
index 5e2b6a7..3954b9b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala
@@ -22,7 +22,7 @@ import java.time.{DateTimeException, LocalDate, ZoneOffset}
 import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localDateToDays}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 
@@ -41,8 +41,11 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
     DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
       withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
         val formatter = DateFormatter(getZoneId(timeZone))
-        val date = formatter.format(17867)
-        assert(date === "2018-12-02")
+        val (days, expected) = (17867, "2018-12-02")
+        val date = formatter.format(days)
+        assert(date === expected)
+        assert(formatter.format(daysToLocalDate(days)) === expected)
+        assert(formatter.format(toJavaDate(days)) === expected)
       }
     }
   }
@@ -70,8 +73,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
                   DateFormatter.defaultLocale,
                   legacyFormat)
                 val days = formatter.parse(date)
-                val formatted = formatter.format(days)
-                assert(date === formatted)
+                assert(date === formatter.format(days))
+                assert(date === formatter.format(daysToLocalDate(days)))
+                assert(date === formatter.format(toJavaDate(days)))
               }
             }
           }
@@ -170,7 +174,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
               DateFormatter.defaultLocale,
               legacyFormat)
             assert(LocalDate.ofEpochDay(formatter.parse("1000-01-01")) === LocalDate.of(1000, 1, 1))
+            assert(formatter.format(LocalDate.of(1000, 1, 1)) === "1000-01-01")
             assert(formatter.format(localDateToDays(LocalDate.of(1000, 1, 1))) === "1000-01-01")
+            assert(formatter.format(java.sql.Date.valueOf("1000-01-01")) === "1000-01-01")
           }
         }
       }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
index 5d27a6b..b467e24 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala
@@ -57,20 +57,29 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
   test("format timestamps using time zones") {
     val microsSinceEpoch = 1543745472001234L
     val expectedTimestamp = Map(
-      "UTC" -> "2018-12-02T10:11:12.001234",
-      PST.getId -> "2018-12-02T02:11:12.001234",
-      CET.getId -> "2018-12-02T11:11:12.001234",
-      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
-      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
-      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
-      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
-      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+      "UTC" -> "2018-12-02 10:11:12.001234",
+      PST.getId -> "2018-12-02 02:11:12.001234",
+      CET.getId -> "2018-12-02 11:11:12.001234",
+      "Africa/Dakar" -> "2018-12-02 10:11:12.001234",
+      "America/Los_Angeles" -> "2018-12-02 02:11:12.001234",
+      "Antarctica/Vostok" -> "2018-12-02 16:11:12.001234",
+      "Asia/Hong_Kong" -> "2018-12-02 18:11:12.001234",
+      "Europe/Amsterdam" -> "2018-12-02 11:11:12.001234")
     DateTimeTestUtils.outstandingTimezonesIds.foreach { zoneId =>
-      val formatter = TimestampFormatter(
-        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
-        DateTimeUtils.getZoneId(zoneId))
-      val timestamp = formatter.format(microsSinceEpoch)
-      assert(timestamp === expectedTimestamp(zoneId))
+      Seq(
+        TimestampFormatter(
+          "yyyy-MM-dd HH:mm:ss.SSSSSS",
+          getZoneId(zoneId),
+          // Test only FAST_DATE_FORMAT because other legacy formats don't support formatting
+          // in microsecond precision.
+          LegacyDateFormats.FAST_DATE_FORMAT,
+          needVarLengthSecondFraction = false),
+        TimestampFormatter.getFractionFormatter(getZoneId(zoneId))).foreach { formatter =>
+        val timestamp = formatter.format(microsSinceEpoch)
+        assert(timestamp === expectedTimestamp(zoneId))
+        assert(formatter.format(microsToInstant(microsSinceEpoch)) === expectedTimestamp(zoneId))
+        assert(formatter.format(toJavaTimestamp(microsSinceEpoch)) === expectedTimestamp(zoneId))
+      }
     }
   }
 
@@ -125,20 +134,30 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
   }
 
   test("format fraction of second") {
-    val formatter = TimestampFormatter.getFractionFormatter(ZoneOffset.UTC)
-    assert(formatter.format(0) === "1970-01-01 00:00:00")
-    assert(formatter.format(1) === "1970-01-01 00:00:00.000001")
-    assert(formatter.format(1000) === "1970-01-01 00:00:00.001")
-    assert(formatter.format(900000) === "1970-01-01 00:00:00.9")
-    assert(formatter.format(1000000) === "1970-01-01 00:00:01")
+    val formatter = TimestampFormatter.getFractionFormatter(UTC)
+    Seq(
+      0 -> "1970-01-01 00:00:00",
+      1 -> "1970-01-01 00:00:00.000001",
+      1000 -> "1970-01-01 00:00:00.001",
+      900000 -> "1970-01-01 00:00:00.9",
+      1000000 -> "1970-01-01 00:00:01").foreach { case (micros, tsStr) =>
+      assert(formatter.format(micros) === tsStr)
+      assert(formatter.format(microsToInstant(micros)) === tsStr)
+      DateTimeTestUtils.withDefaultTimeZone(UTC) {
+        assert(formatter.format(toJavaTimestamp(micros)) === tsStr)
+      }
+    }
   }
 
   test("formatting negative years with default pattern") {
-    val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0)
-      .atZone(ZoneOffset.UTC)
-      .toInstant
+    val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0).atZone(UTC).toInstant
     val micros = DateTimeUtils.instantToMicros(instant)
-    assert(TimestampFormatter(ZoneOffset.UTC).format(micros) === "-0099-01-01 00:00:00")
+    assert(TimestampFormatter(UTC).format(micros) === "-0099-01-01 00:00:00")
+    assert(TimestampFormatter(UTC).format(instant) === "-0099-01-01 00:00:00")
+    DateTimeTestUtils.withDefaultTimeZone(UTC) { // toJavaTimestamp depends on the default time zone
+      assert(TimestampFormatter("yyyy-MM-dd HH:mm:SS G", UTC).format(toJavaTimestamp(micros))
+        === "0100-01-01 00:00:00 BC")
+    }
   }
 
   test("special timestamp values") {
@@ -266,24 +285,31 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
 
   test("SPARK-31557: rebasing in legacy formatters/parsers") {
     withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> LegacyBehaviorPolicy.LEGACY.toString) {
-      LegacyDateFormats.values.foreach { legacyFormat =>
-        DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
-          withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
-            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
-              withClue(s"${zoneId.getId} legacyFormat = $legacyFormat") {
-                val formatter = TimestampFormatter(
+      DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
+          DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+            withClue(s"zoneId = ${zoneId.getId}") {
+              val formatters = LegacyDateFormats.values.map { legacyFormat =>
+                TimestampFormatter(
                   TimestampFormatter.defaultPattern,
                   zoneId,
                   TimestampFormatter.defaultLocale,
                   legacyFormat,
                   needVarLengthSecondFraction = false)
+              }.toSeq :+  TimestampFormatter.getFractionFormatter(zoneId)
+              formatters.foreach { formatter =>
                 assert(microsToInstant(formatter.parse("1000-01-01 01:02:03"))
                   .atZone(zoneId)
                   .toLocalDateTime === LocalDateTime.of(1000, 1, 1, 1, 2, 3))
 
+                assert(formatter.format(
+                  LocalDateTime.of(1000, 1, 1, 1, 2, 3).atZone(zoneId).toInstant) ===
+                  "1000-01-01 01:02:03")
                 assert(formatter.format(instantToMicros(
                   LocalDateTime.of(1000, 1, 1, 1, 2, 3)
                     .atZone(zoneId).toInstant)) === "1000-01-01 01:02:03")
+                assert(formatter.format(java.sql.Timestamp.valueOf("1000-01-01 01:02:03")) ===
+                  "1000-01-01 01:02:03")
               }
             }
           }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index 1a84db1..73484a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -80,13 +80,10 @@ object HiveResult {
   def toHiveString(a: (Any, DataType), nested: Boolean = false): String = a match {
     case (null, _) => if (nested) "null" else "NULL"
     case (b, BooleanType) => b.toString
-    case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
-    case (ld: LocalDate, DateType) =>
-      dateFormatter.format(DateTimeUtils.localDateToDays(ld))
-    case (t: Timestamp, TimestampType) =>
-      timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t))
-    case (i: Instant, TimestampType) =>
-      timestampFormatter.format(DateTimeUtils.instantToMicros(i))
+    case (d: Date, DateType) => dateFormatter.format(d)
+    case (ld: LocalDate, DateType) => dateFormatter.format(ld)
+    case (t: Timestamp, TimestampType) => timestampFormatter.format(t)
+    case (i: Instant, TimestampType) => timestampFormatter.format(i)
     case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
     case (decimal: java.math.BigDecimal, DecimalType()) => decimal.toPlainString
     case (n, _: NumericType) => n.toString
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out
index 1d862ba..151fa1e 100755
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out
@@ -584,7 +584,7 @@ select make_date(-44, 3, 15)
 -- !query schema
 struct<make_date(-44, 3, 15):date>
 -- !query output
--0044-03-15
+0045-03-15
 
 
 -- !query


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org