You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/05 08:00:17 UTC

[spark] branch branch-3.0 updated: [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 253fbd5  [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter
253fbd5 is described below

commit 253fbd54e7fe3dda0672ebc7143b943af8387a2c
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Thu Mar 5 15:29:39 2020 +0800

    [SPARK-30668][SQL][FOLLOWUP] Raise exception instead of silent change for new DateFormatter
    
    This is a follow-up work for #27441. For the cases of new TimestampFormatter return null while legacy formatter can return a value, we need to throw an exception instead of silent change. The legacy config will be referenced in the error message.
    
    Avoid silent result change for new behavior in 3.0.
    
    Yes, an exception is thrown when we detect legacy formatter can parse the string and the new formatter return null.
    
    Extend existing UT.
    
    Closes #27537 from xuanyuanking/SPARK-30668-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 7db0af578585ecaeee9fd23f8189292289b52a97)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/SparkException.scala    |  7 +++
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +
 .../catalyst/expressions/datetimeExpressions.scala |  3 +
 .../spark/sql/catalyst/json/JacksonParser.scala    |  2 +
 .../spark/sql/catalyst/util/DateFormatter.scala    | 60 +++++++++++-------
 .../catalyst/util/DateTimeFormatterHelper.scala    | 26 +++++++-
 .../sql/catalyst/util/TimestampFormatter.scala     | 73 +++++++++++++---------
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++-
 .../expressions/DateExpressionsSuite.scala         | 39 +++++++++---
 .../sql/catalyst/json/JsonInferSchemaSuite.scala   | 16 ++---
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   | 20 ++++--
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 58 ++++++++++-------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  2 +-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 22 ++++++-
 .../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++-
 15 files changed, 271 insertions(+), 97 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4ad9a0c..81c087e 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -43,3 +43,10 @@ private[spark] case class SparkUserAppException(exitCode: Int)
  */
 private[spark] case class ExecutorDeadException(message: String)
   extends SparkException(message)
+
+/**
+ * Exception thrown when Spark returns different result after upgrading to a new version.
+ */
+private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
+  extends SparkException("You may get a different result due to the upgrading of Spark" +
+    s" $version: $message", cause)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index f829e6b..dd8537b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 
 import com.univocity.parsers.csv.CsvParser
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
@@ -285,6 +286,7 @@ class UnivocityParser(
           }
         }
       } catch {
+        case e: SparkUpgradeException => throw e
         case NonFatal(e) =>
           badRecordException = badRecordException.orElse(Some(e))
           row.setNullAt(i)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 05074d9..d92d700 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.text.StringEscapeUtils
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -787,6 +788,7 @@ abstract class ToTimestamp
               formatter.parse(
                 t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
+              case e: SparkUpgradeException => throw e
               case NonFatal(_) => null
             }
           }
@@ -800,6 +802,7 @@ abstract class ToTimestamp
               TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
                 .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
+              case e: SparkUpgradeException => throw e
               case NonFatal(_) => null
             }
           }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index da3b501..d0db06c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -382,6 +383,7 @@ class JacksonParser(
           try {
             row.update(index, fieldConverters(index).apply(parser))
           } catch {
+            case e: SparkUpgradeException => throw e
             case NonFatal(e) =>
               badRecordException = badRecordException.orElse(Some(e))
               parser.skipChildren()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
index 2cf82d1..caf58c1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala
@@ -23,8 +23,9 @@ import java.util.{Date, Locale}
 
 import org.apache.commons.lang3.time.FastDateFormat
 
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, localDateToDays}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 
 sealed trait DateFormatter extends Serializable {
   def parse(s: String): Int // returns days since epoch
@@ -34,16 +35,24 @@ sealed trait DateFormatter extends Serializable {
 class Iso8601DateFormatter(
     pattern: String,
     zoneId: ZoneId,
-    locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
+    locale: Locale,
+    legacyFormat: LegacyDateFormats.LegacyDateFormat)
+  extends DateFormatter with DateTimeFormatterHelper {
 
   @transient
   private lazy val formatter = getOrCreateFormatter(pattern, locale)
 
+  @transient
+  private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(
+    pattern, zoneId, locale, legacyFormat)
+
   override def parse(s: String): Int = {
     val specialDate = convertSpecialDate(s.trim, zoneId)
     specialDate.getOrElse {
-      val localDate = LocalDate.parse(s, formatter)
-      localDateToDays(localDate)
+      try {
+        val localDate = LocalDate.parse(s, formatter)
+        localDateToDays(localDate)
+      } catch checkDiffResult(s, legacyFormatter.parse)
     }
   }
 
@@ -87,33 +96,40 @@ object DateFormatter {
   val defaultLocale: Locale = Locale.US
 
   def defaultPattern(): String = {
-    if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd"
   }
 
   private def getFormatter(
-    format: Option[String],
-    zoneId: ZoneId,
-    locale: Locale = defaultLocale,
-    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = {
-
+      format: Option[String],
+      zoneId: ZoneId,
+      locale: Locale = defaultLocale,
+      legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = {
     val pattern = format.getOrElse(defaultPattern)
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      legacyFormat match {
-        case FAST_DATE_FORMAT =>
-          new LegacyFastDateFormatter(pattern, locale)
-        case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
-          new LegacySimpleDateFormatter(pattern, locale)
-      }
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+      getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
     } else {
-      new Iso8601DateFormatter(pattern, zoneId, locale)
+      new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat)
+    }
+  }
+
+  def getLegacyFormatter(
+      pattern: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): DateFormatter = {
+    legacyFormat match {
+      case FAST_DATE_FORMAT =>
+        new LegacyFastDateFormatter(pattern, locale)
+      case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
+        new LegacySimpleDateFormatter(pattern, locale)
     }
   }
 
   def apply(
-    format: String,
-    zoneId: ZoneId,
-    locale: Locale,
-    legacyFormat: LegacyDateFormat): DateFormatter = {
+      format: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): DateFormatter = {
     getFormatter(Some(format), zoneId, locale, legacyFormat)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
index a7b6309..33aa733 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util
 
 import java.time._
 import java.time.chrono.IsoChronology
-import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, DateTimeParseException, ResolverStyle}
 import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
 import java.util.Locale
 
 import com.google.common.cache.CacheBuilder
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 
 trait DateTimeFormatterHelper {
   // Converts the parsed temporal object to ZonedDateTime. It sets time components to zeros
@@ -57,6 +60,27 @@ trait DateTimeFormatterHelper {
     }
     formatter
   }
+
+  // When legacy time parser policy set to EXCEPTION, check whether we will get different results
+  // between legacy parser and new parser. If new parser fails but legacy parser works, throw a
+  // SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED,
+  // DateTimeParseException will address by the caller side.
+  protected def checkDiffResult[T](
+      s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = {
+    case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION =>
+      val res = try {
+        Some(legacyParseFunc(s))
+      } catch {
+        case _: Throwable => None
+      }
+      if (res.nonEmpty) {
+        throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
+          s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
+          s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
+      } else {
+        throw e
+      }
+  }
 }
 
 private object DateTimeFormatterHelper {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index fd77326..866f81b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -29,7 +29,9 @@ import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.types.Decimal
 
 sealed trait TimestampFormatter extends Serializable {
@@ -52,21 +54,29 @@ sealed trait TimestampFormatter extends Serializable {
 class Iso8601TimestampFormatter(
     pattern: String,
     zoneId: ZoneId,
-    locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
+    locale: Locale,
+    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT)
+  extends TimestampFormatter with DateTimeFormatterHelper {
   @transient
   protected lazy val formatter = getOrCreateFormatter(pattern, locale)
 
+  @transient
+  protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter(
+    pattern, zoneId, locale, legacyFormat)
+
   override def parse(s: String): Long = {
     val specialDate = convertSpecialTimestamp(s.trim, zoneId)
     specialDate.getOrElse {
-      val parsed = formatter.parse(s)
-      val parsedZoneId = parsed.query(TemporalQueries.zone())
-      val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
-      val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
-      val epochSeconds = zonedDateTime.toEpochSecond
-      val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
-
-      Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+      try {
+        val parsed = formatter.parse(s)
+        val parsedZoneId = parsed.query(TemporalQueries.zone())
+        val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
+        val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
+        val epochSeconds = zonedDateTime.toEpochSecond
+        val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)
+
+        Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
+      } catch checkDiffResult(s, legacyFormatter.parse)
     }
   }
 
@@ -186,31 +196,38 @@ object TimestampFormatter {
   def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss"
 
   private def getFormatter(
-    format: Option[String],
-    zoneId: ZoneId,
-    locale: Locale = defaultLocale,
-    legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = {
-
+      format: Option[String],
+      zoneId: ZoneId,
+      locale: Locale = defaultLocale,
+      legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = {
     val pattern = format.getOrElse(defaultPattern)
-    if (SQLConf.get.legacyTimeParserEnabled) {
-      legacyFormat match {
-        case FAST_DATE_FORMAT =>
-          new LegacyFastTimestampFormatter(pattern, zoneId, locale)
-        case SIMPLE_DATE_FORMAT =>
-          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false)
-        case LENIENT_SIMPLE_DATE_FORMAT =>
-          new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true)
-      }
+    if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
+      getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
     } else {
-      new Iso8601TimestampFormatter(pattern, zoneId, locale)
+      new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat)
+    }
+  }
+
+  def getLegacyFormatter(
+      pattern: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): TimestampFormatter = {
+    legacyFormat match {
+      case FAST_DATE_FORMAT =>
+        new LegacyFastTimestampFormatter(pattern, zoneId, locale)
+      case SIMPLE_DATE_FORMAT =>
+        new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false)
+      case LENIENT_SIMPLE_DATE_FORMAT =>
+        new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true)
     }
   }
 
   def apply(
-    format: String,
-    zoneId: ZoneId,
-    locale: Locale,
-    legacyFormat: LegacyDateFormat): TimestampFormatter = {
+      format: String,
+      zoneId: ZoneId,
+      locale: Locale,
+      legacyFormat: LegacyDateFormat): TimestampFormatter = {
     getFormatter(Some(format), zoneId, locale, legacyFormat)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7718dd2..e39d066 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2116,6 +2116,18 @@ object SQLConf {
     .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
     .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
+  val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
+    .internal()
+    .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +
+      "dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. " +
+      "When set to CORRECTED, classes from java.time.* packages are used for the same purpose. " +
+      "The default value is EXCEPTION, RuntimeException is thrown when we will get different " +
+      "results.")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+    .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
   val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
     buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists")
       .internal()
@@ -2496,7 +2508,9 @@ class SQLConf extends Serializable with Logging {
   def legacyMsSqlServerNumericMappingEnabled: Boolean =
     getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
 
-  def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
+  def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
+    LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
+  }
 
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index f1ad753..4a04fbb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -23,7 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId, ZoneOffset}
 import java.util.{Calendar, Locale, TimeZone}
 import java.util.concurrent.TimeUnit._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter}
@@ -241,8 +241,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("DateFormat") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         checkEvaluation(
           DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId),
           null)
@@ -710,8 +710,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("from_unixtime") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val fmt1 = "yyyy-MM-dd HH:mm:ss"
         val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -758,8 +758,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
         val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -824,8 +824,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("to_unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val fmt1 = "yyyy-MM-dd HH:mm:ss"
         val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
@@ -1164,4 +1164,25 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
         Literal(LocalDate.of(1, 1, 1))),
       IntervalUtils.stringToInterval(UTF8String.fromString("interval 9999 years")))
   }
+
+  test("to_timestamp exception mode") {
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkEvaluation(
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), 1580184371847000L)
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkEvaluation(
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), null)
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      checkExceptionInExpression[SparkUpgradeException](
+        GetTimestamp(
+          Literal("2020-01-27T20:06:11.847-0800"),
+          Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Fail to parse")
+    }
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
index c2e03bd..bce917c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
@@ -40,8 +40,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("inferring timestamp type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         checkTimestampType("yyyy", """{"a": "2018"}""")
         checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
         checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
@@ -56,8 +56,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("prefer decimals over timestamps") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParser) {
         checkType(
           options = Map(
             "prefersDecimal" -> "true",
@@ -71,8 +71,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("skip decimal type inferring") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         checkType(
           options = Map(
             "prefersDecimal" -> "false",
@@ -86,8 +86,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
   }
 
   test("fallback to string type") {
-    Seq(true, false).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         checkType(
           options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
           json = """{"a": "20181202.210400123"}""",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 61f0e13..89fb4d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -59,10 +59,22 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
     val df2 = df
       .select(from_csv($"value", schemaWithCorrField1, Map(
         "mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord)))
-
-    checkAnswer(df2, Seq(
-      Row(Row(0, null, "0,2013-111-11 12:13:14")),
-      Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(df2, Seq(
+        Row(Row(0, null, "0,2013-111-11 12:13:14")),
+        Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(df2, Seq(
+        Row(Row(0, java.sql.Date.valueOf("2022-03-11"), null)),
+        Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        df2.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
   }
 
   test("schema_of_csv - infers schemas") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 527dfe5..da7cc1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -23,7 +23,8 @@ import java.time.{Instant, LocalDateTime}
 import java.util.{Locale, TimeZone}
 import java.util.concurrent.TimeUnit
 
-import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -96,8 +97,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   }
 
   test("date format") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c")
 
         checkAnswer(
@@ -377,6 +378,13 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
       Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30"))))
   }
 
+  def checkExceptionMessage(df: DataFrame): Unit = {
+    val message = intercept[SparkException] {
+      df.collect()
+    }.getCause.getMessage
+    assert(message.contains("Fail to parse"))
+  }
+
   test("function to_date") {
     val d1 = Date.valueOf("2015-07-22")
     val d2 = Date.valueOf("2015-07-01")
@@ -422,9 +430,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
       df.select(to_date(col("d"), "yyyy-MM-dd")),
       Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")),
         Row(Date.valueOf("2014-12-31"))))
-    checkAnswer(
-      df.select(to_date(col("s"), "yyyy-MM-dd")),
-      Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+    val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+    withSQLConf(confKey -> "corrected") {
+      checkAnswer(
+        df.select(to_date(col("s"), "yyyy-MM-dd")),
+        Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null)))
+    }
+    withSQLConf(confKey -> "exception") {
+      checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd")))
+    }
 
     // now switch format
     checkAnswer(
@@ -529,8 +543,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   }
 
   test("from_unixtime") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
         val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
         val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
@@ -562,8 +576,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis)
 
   test("unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -629,8 +643,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   }
 
   test("to_unix_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("corrected", "legacy").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
@@ -680,8 +694,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
 
 
   test("to_timestamp") {
-    Seq(false, true).foreach { legacyParser =>
-      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) {
+    Seq("legacy", "corrected").foreach { legacyParserPolicy =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) {
         val date1 = Date.valueOf("2015-07-24")
         val date2 = Date.valueOf("2015-07-25")
         val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00")
@@ -701,7 +715,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
           df.select(unix_timestamp(col("ss")).cast("timestamp")))
         checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
           Row(ts1), Row(ts2)))
-        if (legacyParser) {
+        if (legacyParserPolicy == "legacy") {
           // In Spark 2.4 and earlier, to_timestamp() parses in seconds precision and cuts off
           // the fractional part of seconds. The behavior was changed by SPARK-27438.
           val legacyFmt = "yyyy/MM/dd HH:mm:ss"
@@ -819,16 +833,18 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-30668: use legacy timestamp parser in to_timestamp") {
-    def checkTimeZoneParsing(expected: Any): Unit = {
-      val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+    val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key
+    val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts")
+    withSQLConf(confKey -> "legacy") {
+      val expected = Timestamp.valueOf("2020-01-27 20:06:11.847")
       checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")),
         Row(expected))
     }
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
-      checkTimeZoneParsing(Timestamp.valueOf("2020-01-27 20:06:11.847"))
+    withSQLConf(confKey -> "corrected") {
+      checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null))
     }
-    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") {
-      checkTimeZoneParsing(null)
+    withSQLConf(confKey -> "exception") {
+      checkExceptionMessage(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 500b6cc..e5f1fa6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZEDateFormatter.key -> "2000") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 43553df..30ae9dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2307,6 +2307,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
     val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds)
     checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12")))
   }
+
+  test("exception mode for parsing date/timestamp string") {
+    val ds = Seq("2020-01-27T20:06:11.847-0800").toDS()
+    val csv = spark.read
+      .option("header", false)
+      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+      .schema("t timestamp").csv(ds)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        csv.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(csv, Row(null))
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {
@@ -2327,5 +2347,5 @@ class CSVLegacyTimeParserSuite extends CSVSuite {
   override protected def sparkConf: SparkConf =
     super
       .sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index d2ce555..df0bca7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2669,6 +2669,26 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
       Date.valueOf("2020-1-12"),
       Date.valueOf(LocalDate.ofEpochDay(12345))))
   }
+
+  test("exception mode for parsing date/timestamp string") {
+    val ds = Seq("{'t': '2020-01-27T20:06:11.847-0800'}").toDS()
+    val json = spark.read
+      .schema("t timestamp")
+      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz")
+      .json(ds)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") {
+      val msg = intercept[SparkException] {
+        json.collect()
+      }.getCause.getMessage
+      assert(msg.contains("Fail to parse"))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
+      checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847")))
+    }
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") {
+      checkAnswer(json, Row(null))
+    }
+  }
 }
 
 class JsonV1Suite extends JsonSuite {
@@ -2689,5 +2709,5 @@ class JsonLegacyTimeParserSuite extends JsonSuite {
   override protected def sparkConf: SparkConf =
     super
       .sparkConf
-      .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true)
+      .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org