You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/17 02:36:08 UTC

[spark] branch branch-3.0 updated: [SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new afe2247  [SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files
afe2247 is described below

commit afe2247cffda4dc46c41e3db9d1dc9853beadd28
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun May 17 02:32:39 2020 +0000

    [SPARK-31405][SQL][3.0] Fail by default when reading/writing legacy datetime values from/to Parquet/Avro files
    
    ### What changes were proposed in this pull request?
    
    When reading/writing datetime values that before the rebase switch day, from/to Avro/Parquet files, fail by default and ask users to set a config to explicitly do rebase or not.
    
    ### Why are the changes needed?
    
    Rebase or not rebase have different behaviors and we should let users decide it explicitly. In most cases, users won't hit this exception as it only affects ancient datetime values.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now users will see an error when reading/writing dates before 1582-10-15 or timestamps before 1900-01-01 from/to Parquet/Avro files, with an error message to ask setting a config.
    
    ### How was this patch tested?
    
    updated tests
    
    Closes #28526 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/SparkException.scala    |   2 +-
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  37 ++++----
 .../org/apache/spark/sql/avro/AvroFileFormat.scala |  10 +-
 .../apache/spark/sql/avro/AvroOutputWriter.scala   |  13 ++-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  30 +++---
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   |   9 +-
 .../sql/avro/AvroCatalystDataConversionSuite.scala |   2 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala      |  91 ++++++++++++------
 .../spark/sql/catalyst/json/JacksonParser.scala    |   1 +
 .../spark/sql/catalyst/util/RebaseDateTime.scala   |   4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 100 ++++++++++----------
 .../parquet/VectorizedColumnReader.java            |  91 ++++++++++++------
 .../parquet/VectorizedParquetRecordReader.java     |  12 +--
 .../parquet/VectorizedPlainValuesReader.java       |  29 ++++--
 .../parquet/VectorizedRleValuesReader.java         |  31 ++++---
 .../parquet/VectorizedValuesReader.java            |   4 +-
 .../execution/datasources/DataSourceUtils.scala    | 103 ++++++++++++++++++++-
 .../sql/execution/datasources/FileScanRDD.scala    |   6 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  11 +--
 .../datasources/parquet/ParquetReadSupport.scala   |   7 +-
 .../parquet/ParquetRecordMaterializer.scala        |   9 +-
 .../datasources/parquet/ParquetRowConverter.scala  |  68 ++++++--------
 .../datasources/parquet/ParquetWriteSupport.scala  |  46 ++++-----
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  23 ++---
 .../benchmark/DateTimeRebaseBenchmark.scala        |   4 +-
 .../datasources/parquet/ParquetIOSuite.scala       |  89 +++++++++++-------
 .../spark/sql/sources/HadoopFsRelationTest.scala   |   6 +-
 27 files changed, 519 insertions(+), 319 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 81c087e..4138213 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String)
  * Exception thrown when Spark returns different result after upgrading to a new version.
  */
 private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
-  extends SparkException("You may get a different result due to the upgrading of Spark" +
+  extends RuntimeException("You may get a different result due to the upgrading of Spark" +
     s" $version: $message", cause)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 27206ed..4fc8040 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 /**
  * A deserializer to deserialize data in avro format to data in catalyst format.
  */
-class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
+class AvroDeserializer(
+    rootAvroType: Schema,
+    rootCatalystType: DataType,
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
 
   def this(rootAvroType: Schema, rootCatalystType: DataType) {
     this(rootAvroType, rootCatalystType,
-      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
+      LegacyBehaviorPolicy.withName(
+        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)))
   }
 
   private lazy val decimalConversions = new DecimalConversion()
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+    datetimeRebaseMode, "Avro")
+
+  private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
+    datetimeRebaseMode, "Avro")
+
   private val converter: Any => Any = rootCatalystType match {
     // A shortcut for empty schema.
     case st: StructType if st.isEmpty =>
@@ -96,13 +107,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
       case (INT, IntegerType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
-      case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
-        val days = value.asInstanceOf[Int]
-        val rebasedDays = rebaseJulianToGregorianDays(days)
-        updater.setInt(ordinal, rebasedDays)
-
       case (INT, DateType) => (updater, ordinal, value) =>
-        updater.setInt(ordinal, value.asInstanceOf[Int])
+        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
 
       case (LONG, LongType) => (updater, ordinal, value) =>
         updater.setLong(ordinal, value.asInstanceOf[Long])
@@ -110,22 +116,13 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD
       case (LONG, TimestampType) => avroType.getLogicalType match {
         // For backward compatibility, if the Avro type is Long and it is not logical type
         // (the `null` case), the value is processed as timestamp type with millisecond precision.
-        case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.fromMillis(millis)
-          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
         case null | _: TimestampMillis => (updater, ordinal, value) =>
           val millis = value.asInstanceOf[Long]
           val micros = DateTimeUtils.fromMillis(millis)
-          updater.setLong(ordinal, micros)
-        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          val rebasedMicros = rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
         case _: TimestampMicros => (updater, ordinal, value) =>
           val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, micros)
+          updater.setLong(ordinal, timestampRebaseFunc(micros))
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
       }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e69c95b..59d54bc 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
         reader.sync(file.start)
         val stop = file.start + file.length
 
-        val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-          reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
-          SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
-        }
+        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+          reader.asInstanceOf[DataFileReader[_]].getMetaString,
+          SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
+
         val deserializer = new AvroDeserializer(
-          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
+          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode)
 
         new Iterator[InternalRow] {
           private[this] var completed = false
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 82a5680..ac9608c 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
@@ -43,12 +44,12 @@ private[avro] class AvroOutputWriter(
     avroSchema: Schema) extends OutputWriter {
 
   // Whether to rebase datetimes from Gregorian to Julian calendar in write
-  private val rebaseDateTime: Boolean =
-    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+  private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
+    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))
 
   // The input rows will never be null.
   private lazy val serializer =
-    new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
+    new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode)
 
   /**
    * Overrides the couple of methods responsible for generating the output streams / files so
@@ -56,7 +57,11 @@ private[avro] class AvroOutputWriter(
    */
   private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
     val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
-      if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
+      if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
+        Some(SPARK_LEGACY_DATETIME -> "")
+      } else {
+        None
+      }
     }
 
     new SparkAvroKeyOutputFormat(fileMeta.asJava) {
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index dc23216..d6cfbc5 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -46,17 +47,24 @@ class AvroSerializer(
     rootCatalystType: DataType,
     rootAvroType: Schema,
     nullable: Boolean,
-    rebaseDateTime: Boolean) extends Logging {
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging {
 
   def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
     this(rootCatalystType, rootAvroType, nullable,
-      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
+      LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
+        SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
   }
 
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
   }
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
+    datetimeRebaseMode, "Avro")
+
+  private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
+    datetimeRebaseMode, "Avro")
+
   private val converter: Any => Any = {
     val actualAvroType = resolveNullableType(rootAvroType, nullable)
     val baseConverter = rootCatalystType match {
@@ -146,24 +154,16 @@ class AvroSerializer(
       case (BinaryType, BYTES) =>
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
 
-      case (DateType, INT) if rebaseDateTime =>
-        (getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))
-
       case (DateType, INT) =>
-        (getter, ordinal) => getter.getInt(ordinal)
+        (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
 
       case (TimestampType, LONG) => avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
-            val micros = getter.getLong(ordinal)
-            val rebasedMicros = rebaseGregorianToJulianMicros(micros)
-            DateTimeUtils.toMillis(rebasedMicros)
           case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.toMillis(getter.getLong(ordinal))
-          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
-            rebaseGregorianToJulianMicros(getter.getLong(ordinal))
-          case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
+            DateTimeUtils.toMillis(timestampRebaseFunc(getter.getLong(ordinal)))
+          case _: TimestampMicros => (getter, ordinal) =>
+            timestampRebaseFunc(getter.getLong(ordinal))
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
         }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 712aec6..15918f4 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory(
       reader.sync(partitionedFile.start)
       val stop = partitionedFile.start + partitionedFile.length
 
-      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-        reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
-        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
-      }
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        reader.asInstanceOf[DataFileReader[_]].getMetaString,
+        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
       val deserializer = new AvroDeserializer(
-        userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
+        userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode)
 
       val fileReader = new PartitionReader[InternalRow] {
         private[this] var completed = false
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index 64d790b..c8a1f67 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
       """.stripMargin
     val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType
-    val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
+    val deserializer = new AvroDeserializer(avroSchema, dataType)
 
     def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
       assert(checkResult(
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 3e754f0..a5c1fb1 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.v2.avro.AvroScan
@@ -1538,13 +1539,28 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
         val path3_0_rebase = paths(1).getCanonicalPath
         if (dt == "date") {
           val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
-          df.write.format("avro").save(path3_0)
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = intercept[SparkException](df.write.format("avro").save(path3_0))
+          assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
+            df.write.format("avro").mode("overwrite").save(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
             df.write.format("avro").save(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+
+          // For Avro files written by Spark 3.0, we know the writer info and don't need the config
+          // to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+          }
         } else {
           val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
           val avroSchema =
@@ -1556,24 +1572,39 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
               |    {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
               |  ]
               |}""".stripMargin
-          df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = intercept[SparkException] {
+            df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
+          }
+          assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
+            df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
             df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+
+          // For Avro files written by Spark 3.0, we know the writer info and don't need the config
+          // to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          }
         }
       }
     }
 
-    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-      checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
-      checkReadMixedFiles(
-        "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
-      checkReadMixedFiles(
-        "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
-    }
+    checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
+    checkReadMixedFiles(
+      "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
+    checkReadMixedFiles(
+      "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
   }
 
   test("SPARK-31183: rebasing microseconds timestamps in write") {
@@ -1581,7 +1612,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     val nonRebased = "1001-01-07 01:09:05.123456"
     withTempPath { dir =>
       val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
         Seq(tsStr).toDF("tsS")
           .select($"tsS".cast("timestamp").as("ts"))
           .write.format("avro")
@@ -1589,9 +1620,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       }
 
       // The file metadata indicates if it needs rebase or not, so we can always get the correct
-      // result regardless of the "rebaseInRead" config.
-      Seq(true, false).foreach { rebase =>
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+      // result regardless of the "rebase mode" config.
+      Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
           checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
         }
       }
@@ -1622,7 +1653,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
         |}""".stripMargin
       withTempPath { dir =>
         val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
           Seq(tsStr).toDF("tsS")
             .select($"tsS".cast("timestamp").as("ts"))
             .write
@@ -1632,9 +1663,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
         }
 
         // The file metadata indicates if it needs rebase or not, so we can always get the correct
-        // result regardless of the "rebaseInRead" config.
-        Seq(true, false).foreach { rebase =>
-          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+        // result regardless of the "rebase mode" config.
+        Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
             checkAnswer(
               spark.read.schema("ts timestamp").format("avro").load(path),
               Row(Timestamp.valueOf(rebased)))
@@ -1655,7 +1686,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
   test("SPARK-31183: rebasing dates in write") {
     withTempPath { dir =>
       val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
         Seq("1001-01-01").toDF("dateS")
           .select($"dateS".cast("date").as("date"))
           .write.format("avro")
@@ -1663,9 +1694,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       }
 
       // The file metadata indicates if it needs rebase or not, so we can always get the correct
-      // result regardless of the "rebaseInRead" config.
-      Seq(true, false).foreach { rebase =>
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+      // result regardless of the "rebase mode" config.
+      Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
           checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
         }
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index a52c345..ef98793 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -456,6 +456,7 @@ class JacksonParser(
         }
       }
     } catch {
+      case e: SparkUpgradeException => throw e
       case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
         // JSON parser currently doesn't support partial results for corrupted records.
         // For such records, all fields other than the field configured by
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
index eb67ff7..e29fa4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
@@ -146,6 +146,8 @@ object RebaseDateTime {
     -354226, -317702, -244653, -208129, -171605, -141436, -141435, -141434,
     -141433, -141432, -141431, -141430, -141429, -141428, -141427)
 
+  final val lastSwitchGregorianDay: Int = gregJulianDiffSwitchDay.last
+
   // The first days of Common Era (CE) which is mapped to the '0001-01-01' date
   // in Proleptic Gregorian calendar.
   private final val gregorianCommonEraStartDay = gregJulianDiffSwitchDay(0)
@@ -295,7 +297,7 @@ object RebaseDateTime {
   }
   // The switch time point after which all diffs between Gregorian and Julian calendars
   // across all time zones are zero
-  private final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap)
+  final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap)
 
   private final val gregorianStartTs = LocalDateTime.of(gregorianStartDate, LocalTime.MIDNIGHT)
   private final val julianEndTs = LocalDateTime.of(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b073f7e..d0b55ce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2500,57 +2500,63 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
-  val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
-    buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
-        "to the hybrid calendar (Julian + Gregorian) in write. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the resulted date/" +
-        "timestamp in the target calendar, and getting the number of micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+  val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
+    buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
+        "to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " +
+        "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
+        "ancient dates/timestamps that are ambiguous between the two calendars.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
-
-  val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
-    buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps " +
-        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the resulted date/" +
-        "timestamp in the target calendar, and getting the number of micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+  val LEGACY_PARQUET_REBASE_MODE_IN_READ =
+    buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
+        "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " +
+        "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
+        "ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
+        "only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
-  val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
-    buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
+  val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
+    buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
       .internal()
-      .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
-        "to the hybrid calendar (Julian + Gregorian) in write. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the resulted date/" +
-        "timestamp in the target calendar, and getting the number of micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
+        "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " +
+        "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
+        "ancient dates/timestamps that are ambiguous between the two calendars.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
-
-  val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
-    buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
-      .internal()
-      .doc("When true, rebase dates/timestamps " +
-        "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
-        "The rebasing is performed by converting micros/millis/days to " +
-        "a local date/timestamp in the source calendar, interpreting the resulted date/" +
-        "timestamp in the target calendar, and getting the number of micros/millis/days " +
-        "since the epoch 1970-01-01 00:00:00Z.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
+
+  val LEGACY_AVRO_REBASE_MODE_IN_READ =
+    buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
+      .internal()
+      .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
+        "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " +
+        "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
+        "When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
+        "ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
+        "only effective if the writer info (like Spark, Hive) of the Avro files is unknown.")
       .version("3.0.0")
-      .booleanConf
-      .createWithDefault(false)
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
+      .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
   val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
     buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
@@ -3136,10 +3142,6 @@ class SQLConf extends Serializable with Logging {
 
   def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
 
-  def parquetRebaseDateTimeInReadEnabled: Boolean = {
-    getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-  }
-
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 11ce11d..f264281 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -36,6 +36,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
@@ -102,14 +103,14 @@ public class VectorizedColumnReader {
   // The timezone conversion to apply to int96 timestamps. Null if no conversion.
   private final ZoneId convertTz;
   private static final ZoneId UTC = ZoneOffset.UTC;
-  private final boolean rebaseDateTime;
+  private final String datetimeRebaseMode;
 
   public VectorizedColumnReader(
       ColumnDescriptor descriptor,
       OriginalType originalType,
       PageReader pageReader,
       ZoneId convertTz,
-      boolean rebaseDateTime) throws IOException {
+      String datetimeRebaseMode) throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
     this.convertTz = convertTz;
@@ -132,7 +133,9 @@ public class VectorizedColumnReader {
     if (totalValueCount == 0) {
       throw new IOException("totalValueCount == 0");
     }
-    this.rebaseDateTime = rebaseDateTime;
+    assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) ||
+      "CORRECTED".equals(datetimeRebaseMode);
+    this.datetimeRebaseMode = datetimeRebaseMode;
   }
 
   /**
@@ -156,11 +159,11 @@ public class VectorizedColumnReader {
     boolean isSupported = false;
     switch (typeName) {
       case INT32:
-        isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
+        isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
         break;
       case INT64:
         if (originalType == OriginalType.TIMESTAMP_MICROS) {
-          isSupported = !rebaseDateTime;
+          isSupported = "CORRECTED".equals(datetimeRebaseMode);
         } else {
           isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
         }
@@ -174,6 +177,30 @@ public class VectorizedColumnReader {
     return isSupported;
   }
 
+  static int rebaseDays(int julianDays, final boolean failIfRebase) {
+    if (failIfRebase) {
+      if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        return julianDays;
+      }
+    } else {
+      return RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
+    }
+  }
+
+  static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
+    if (failIfRebase) {
+      if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        return julianMicros;
+      }
+    } else {
+      return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
+    }
+  }
+
   /**
    * Reads `total` values from this columnReader into column.
    */
@@ -283,7 +310,7 @@ public class VectorizedColumnReader {
       case INT32:
         if (column.dataType() == DataTypes.IntegerType ||
             DecimalType.is32BitDecimalType(column.dataType()) ||
-            (column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
+            (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -302,11 +329,11 @@ public class VectorizedColumnReader {
             }
           }
         } else if (column.dataType() == DataTypes.DateType) {
+          final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
-              int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
-              column.putInt(i, gregorianDays);
+              column.putInt(i, rebaseDays(julianDays, failIfRebase));
             }
           }
         } else {
@@ -317,36 +344,37 @@ public class VectorizedColumnReader {
       case INT64:
         if (column.dataType() == DataTypes.LongType ||
             DecimalType.is64BitDecimalType(column.dataType()) ||
-            (originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) {
+            (originalType == OriginalType.TIMESTAMP_MICROS &&
+              "CORRECTED".equals(datetimeRebaseMode))) {
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
             }
           }
         } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
-          if (rebaseDateTime) {
+          if ("CORRECTED".equals(datetimeRebaseMode)) {
             for (int i = rowId; i < rowId + num; ++i) {
               if (!column.isNullAt(i)) {
-                long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
-                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
-                long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
-                column.putLong(i, gregorianMicros);
+                long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
               }
             }
           } else {
+            final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
             for (int i = rowId; i < rowId + num; ++i) {
               if (!column.isNullAt(i)) {
-                long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
-                column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
+                long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
+                long julianMicros = DateTimeUtils.fromMillis(julianMillis);
+                column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
               }
             }
           }
         } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
+          final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
           for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
               long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
-              long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
-              column.putLong(i, gregorianMicros);
+              column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
             }
           }
         } else {
@@ -466,12 +494,13 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else if (column.dataType() == DataTypes.DateType ) {
-      if (rebaseDateTime) {
-        defColumn.readIntegersWithRebase(
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
+        defColumn.readIntegers(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
       } else {
-        defColumn.readIntegers(
-           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+        boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
+        defColumn.readIntegersWithRebase(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
       }
     } else {
       throw constructConvertNotSupportedException(descriptor, column);
@@ -485,27 +514,29 @@ public class VectorizedColumnReader {
       defColumn.readLongs(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
-      if (rebaseDateTime) {
-        defColumn.readLongsWithRebase(
-          num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
-      } else {
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
         defColumn.readLongs(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+      } else {
+        boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
+        defColumn.readLongsWithRebase(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
       }
     } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
-      if (rebaseDateTime) {
+      if ("CORRECTED".equals(datetimeRebaseMode)) {
         for (int i = 0; i < num; i++) {
           if (defColumn.readInteger() == maxDefLevel) {
-            long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
-            column.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(micros));
+            column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
           } else {
             column.putNull(rowId + i);
           }
         }
       } else {
+        final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
         for (int i = 0; i < num; i++) {
           if (defColumn.readInteger() == maxDefLevel) {
-            column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
+            long julianMicros = DateTimeUtils.fromMillis(dataColumn.readLong());
+            column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase));
           } else {
             column.putNull(rowId + i);
           }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index c9590b9..b40cc15 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -89,9 +89,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private final ZoneId convertTz;
 
   /**
-   * true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar.
+   * The mode of rebasing date/timestamp from Julian to Proleptic Gregorian calendar.
    */
-  private final boolean rebaseDateTime;
+  private final String datetimeRebaseMode;
 
   /**
    * columnBatch object that is used for batch decoding. This is created on first use and triggers
@@ -122,16 +122,16 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private final MemoryMode MEMORY_MODE;
 
   public VectorizedParquetRecordReader(
-    ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) {
+    ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int capacity) {
     this.convertTz = convertTz;
-    this.rebaseDateTime = rebaseDateTime;
+    this.datetimeRebaseMode = datetimeRebaseMode;
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
     this.capacity = capacity;
   }
 
   // For test only.
   public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
-    this(null, false, useOffHeap, capacity);
+    this(null, "CORRECTED", useOffHeap, capacity);
   }
 
   /**
@@ -321,7 +321,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
-        pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
+        pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 2ed2e11..eddbf39 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -21,13 +21,14 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.ParquetDecodingException;
+
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
+import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.api.Binary;
-
 /**
  * An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
  */
@@ -86,7 +87,8 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
   // iterates the values twice: check if we need to rebase first, then go to the optimized branch
   // if rebase is not needed.
   @Override
-  public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
+  public final void readIntegersWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     int requiredBytes = total * 4;
     ByteBuffer buffer = getBuffer(requiredBytes);
     boolean rebase = false;
@@ -94,8 +96,12 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
       rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay();
     }
     if (rebase) {
-      for (int i = 0; i < total; i += 1) {
-        c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
+      if (failIfRebase) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        for (int i = 0; i < total; i += 1) {
+          c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
+        }
       }
     } else {
       if (buffer.hasArray()) {
@@ -128,7 +134,8 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
   // iterates the values twice: check if we need to rebase first, then go to the optimized branch
   // if rebase is not needed.
   @Override
-  public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
+  public final void readLongsWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     int requiredBytes = total * 8;
     ByteBuffer buffer = getBuffer(requiredBytes);
     boolean rebase = false;
@@ -136,8 +143,12 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
       rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs();
     }
     if (rebase) {
-      for (int i = 0; i < total; i += 1) {
-        c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
+      if (failIfRebase) {
+        throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
+      } else {
+        for (int i = 0; i < total; i += 1) {
+          c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
+        }
       }
     } else {
       if (buffer.hasArray()) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 4d72a33..24347a4e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
@@ -26,12 +29,8 @@ import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
-import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 /**
  * A values reader for Parquet's run-length encoded data. This is based off of the version in
  * parquet-mr with these changes:
@@ -211,7 +210,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) throws IOException {
+      VectorizedValuesReader data,
+      final boolean failIfRebase) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -219,7 +219,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
       switch (mode) {
         case RLE:
           if (currentValue == level) {
-            data.readIntegersWithRebase(n, c, rowId);
+            data.readIntegersWithRebase(n, c, rowId, failIfRebase);
           } else {
             c.putNulls(rowId, n);
           }
@@ -227,8 +227,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
-              c.putInt(rowId + i,
-                RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger()));
+              int julianDays = data.readInteger();
+              c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
             } else {
               c.putNull(rowId + i);
             }
@@ -387,7 +387,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
       WritableColumnVector c,
       int rowId,
       int level,
-      VectorizedValuesReader data) throws IOException {
+      VectorizedValuesReader data,
+      final boolean failIfRebase) throws IOException {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -395,7 +396,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
       switch (mode) {
         case RLE:
           if (currentValue == level) {
-            data.readLongsWithRebase(n, c, rowId);
+            data.readLongsWithRebase(n, c, rowId, failIfRebase);
           } else {
             c.putNulls(rowId, n);
           }
@@ -403,8 +404,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
         case PACKED:
           for (int i = 0; i < n; ++i) {
             if (currentBuffer[currentBufferIdx++] == level) {
-              c.putLong(rowId + i,
-                RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong()));
+              long julianMicros = data.readLong();
+              c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
             } else {
               c.putNull(rowId + i);
             }
@@ -584,7 +585,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
   }
 
   @Override
-  public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
+  public void readIntegersWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
@@ -604,7 +606,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
   }
 
   @Override
-  public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
+  public void readLongsWithRebase(
+      int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 809ac44..35db8f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -40,9 +40,9 @@ public interface VectorizedValuesReader {
   void readBooleans(int total, WritableColumnVector c, int rowId);
   void readBytes(int total, WritableColumnVector c, int rowId);
   void readIntegers(int total, WritableColumnVector c, int rowId);
-  void readIntegersWithRebase(int total, WritableColumnVector c, int rowId);
+  void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
   void readLongs(int total, WritableColumnVector c, int rowId);
-  void readLongsWithRebase(int total, WritableColumnVector c, int rowId);
+  void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
   void readFloats(int total, WritableColumnVector c, int rowId);
   void readDoubles(int total, WritableColumnVector c, int rowId);
   void readBinary(int total, WritableColumnVector c, int rowId);
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 45a9b1a..abb74d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -23,9 +23,12 @@ import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.SparkUpgradeException
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.RebaseDateTime
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -84,17 +87,107 @@ object DataSourceUtils {
       case _ => false
     }
 
-  def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+  def datetimeRebaseMode(
+      lookupFileMeta: String => String,
+      modeByConfig: String): LegacyBehaviorPolicy.Value = {
     if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
-      return Some(false)
+      return LegacyBehaviorPolicy.CORRECTED
     }
-    // If there is no version, we return None and let the caller side to decide.
+    // If there is no version, we return the mode specified by the config.
     Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
       // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
       // rebase the datetime values.
       // Files written by Spark 3.0 and latter may also need the rebase if they were written with
-      // the "rebaseInWrite" config enabled.
-      version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
+      // the "LEGACY" rebase mode.
+      if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+
+  def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
+    val config = if (format == "Parquet") {
+      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key
+    } else if (format == "Avro") {
+      SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key
+    } else {
+      throw new IllegalStateException("unrecognized format " + format)
     }
+    new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " +
+      s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " +
+      "Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is " +
+      "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " +
+      s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " +
+      s"the calendar difference during reading. Or set $config to 'CORRECTED' to read the " +
+      "datetime values as it is.", null)
+  }
+
+  def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
+    val config = if (format == "Parquet") {
+      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key
+    } else if (format == "Avro") {
+      SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key
+    } else {
+      throw new IllegalStateException("unrecognized format " + format)
+    }
+    new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " +
+      s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " +
+      "Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is " +
+      "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " +
+      s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " +
+      "the calendar difference during writing, to get maximum interoperability. Or set " +
+      s"$config to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that " +
+      "the written files will only be read by Spark 3.0+ or other systems that use Proleptic " +
+      "Gregorian calendar.", null)
+  }
+
+  def creteDateRebaseFuncInRead(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Int => Int = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
+      if (days < RebaseDateTime.lastSwitchJulianDay) {
+        throw DataSourceUtils.newRebaseExceptionInRead(format)
+      }
+      days
+    case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays
+    case LegacyBehaviorPolicy.CORRECTED => identity[Int]
+  }
+
+  def creteDateRebaseFuncInWrite(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Int => Int = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
+      if (days < RebaseDateTime.lastSwitchGregorianDay) {
+        throw DataSourceUtils.newRebaseExceptionInWrite(format)
+      }
+      days
+    case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays
+    case LegacyBehaviorPolicy.CORRECTED => identity[Int]
+  }
+
+  def creteTimestampRebaseFuncInRead(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Long => Long = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
+      if (micros < RebaseDateTime.lastSwitchJulianTs) {
+        throw DataSourceUtils.newRebaseExceptionInRead(format)
+      }
+      micros
+    case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros
+    case LegacyBehaviorPolicy.CORRECTED => identity[Long]
+  }
+
+  def creteTimestampRebaseFuncInWrite(
+      rebaseMode: LegacyBehaviorPolicy.Value,
+      format: String): Long => Long = rebaseMode match {
+    case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
+      if (micros < RebaseDateTime.lastSwitchGregorianTs) {
+        throw DataSourceUtils.newRebaseExceptionInWrite(format)
+      }
+      micros
+    case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros
+    case LegacyBehaviorPolicy.CORRECTED => identity[Long]
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 542c996..fc59336 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -21,7 +21,7 @@ import java.io.{FileNotFoundException, IOException}
 
 import org.apache.parquet.io.ParquetDecodingException
 
-import org.apache.spark.{Partition => RDDPartition, TaskContext}
+import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
@@ -178,7 +178,9 @@ class FileScanRDD(
                 s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
               throw new QueryExecutionException(message, e)
             case e: ParquetDecodingException =>
-              if (e.getMessage.contains("Can not read value at")) {
+              if (e.getCause.isInstanceOf[SparkUpgradeException]) {
+                throw e.getCause
+              } else if (e.getMessage.contains("Can not read value at")) {
                 val message = "Encounter error while reading parquet files. " +
                   "One possible cause: Parquet column cannot be converted in the " +
                   "corresponding files. Details: "
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c6d9ddf..7187410 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -300,10 +300,9 @@ class ParquetFileFormat
           None
         }
 
-      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
-        footerFileMetaData.getKeyValueMetaData.get).getOrElse {
-        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-      }
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
 
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
       val hadoopAttemptContext =
@@ -318,7 +317,7 @@ class ParquetFileFormat
       if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
           convertTz.orNull,
-          rebaseDateTime,
+          datetimeRebaseMode.toString,
           enableOffHeapColumnVector && taskContext.isDefined,
           capacity)
         val iter = new RecordReaderIterator(vectorizedReader)
@@ -337,7 +336,7 @@ class ParquetFileFormat
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns InternalRow
         val readSupport = new ParquetReadSupport(
-          convertTz, enableVectorizedReader = false, rebaseDateTime)
+          convertTz, enableVectorizedReader = false, datetimeRebaseMode)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
           new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 28165e0..a30d1c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.parquet.schema.Type.Repetition
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -53,7 +54,7 @@ import org.apache.spark.sql.types._
 class ParquetReadSupport(
     val convertTz: Option[ZoneId],
     enableVectorizedReader: Boolean,
-    rebaseDateTime: Boolean)
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
   extends ReadSupport[InternalRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
@@ -61,7 +62,7 @@ class ParquetReadSupport(
     // We need a zero-arg constructor for SpecificParquetRecordReaderBase.  But that is only
     // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly,
     // and the values here are ignored.
-    this(None, enableVectorizedReader = true, rebaseDateTime = false)
+    this(None, enableVectorizedReader = true, datetimeRebaseMode = LegacyBehaviorPolicy.CORRECTED)
   }
 
   /**
@@ -130,7 +131,7 @@ class ParquetReadSupport(
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
       new ParquetToSparkSchemaConverter(conf),
       convertTz,
-      rebaseDateTime)
+      datetimeRebaseMode)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index ec03713..bb528d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -23,6 +23,7 @@ import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -32,19 +33,19 @@ import org.apache.spark.sql.types.StructType
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
  * @param convertTz the optional time zone to convert to int96 data
- * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
- *                       calendar
+ * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian
+ *                           calendar
  */
 private[parquet] class ParquetRecordMaterializer(
     parquetSchema: MessageType,
     catalystSchema: StructType,
     schemaConverter: ParquetToSparkSchemaConverter,
     convertTz: Option[ZoneId],
-    rebaseDateTime: Boolean)
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value)
   extends RecordMaterializer[InternalRow] {
 
   private val rootConverter = new ParquetRowConverter(
-    schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, NoopUpdater)
+    schemaConverter, parquetSchema, catalystSchema, convertTz, datetimeRebaseMode, NoopUpdater)
 
   override def getCurrentRecord: InternalRow = rootConverter.currentRecord
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 08fbca2..9d37f17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -35,8 +35,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -121,8 +122,8 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
  * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
  *        types should have been expanded.
  * @param convertTz the optional time zone to convert to int96 data
- * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
- *                       calendar
+ * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian
+ *                           calendar
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class ParquetRowConverter(
@@ -130,7 +131,7 @@ private[parquet] class ParquetRowConverter(
     parquetType: GroupType,
     catalystType: StructType,
     convertTz: Option[ZoneId],
-    rebaseDateTime: Boolean,
+    datetimeRebaseMode: LegacyBehaviorPolicy.Value,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -181,6 +182,12 @@ private[parquet] class ParquetRowConverter(
    */
   def currentRecord: InternalRow = currentRow
 
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
+    datetimeRebaseMode, "Parquet")
+
+  private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
+    datetimeRebaseMode, "Parquet")
+
   // Converters for each field.
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
@@ -275,35 +282,17 @@ private[parquet] class ParquetRowConverter(
         new ParquetStringConverter(updater)
 
       case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              val rebased = rebaseJulianToGregorianMicros(value)
-              updater.setLong(rebased)
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              updater.setLong(value)
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addLong(value: Long): Unit = {
+            updater.setLong(timestampRebaseFunc(value))
           }
         }
 
       case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              val micros = DateTimeUtils.fromMillis(value)
-              val rebased = rebaseJulianToGregorianMicros(micros)
-              updater.setLong(rebased)
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addLong(value: Long): Unit = {
-              updater.setLong(DateTimeUtils.fromMillis(value))
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addLong(value: Long): Unit = {
+            val micros = DateTimeUtils.fromMillis(value)
+            updater.setLong(timestampRebaseFunc(micros))
           }
         }
 
@@ -328,17 +317,9 @@ private[parquet] class ParquetRowConverter(
         }
 
       case DateType =>
-        if (rebaseDateTime) {
-          new ParquetPrimitiveConverter(updater) {
-            override def addInt(value: Int): Unit = {
-              updater.set(rebaseJulianToGregorianDays(value))
-            }
-          }
-        } else {
-          new ParquetPrimitiveConverter(updater) {
-            override def addInt(value: Int): Unit = {
-              updater.set(value)
-            }
+        new ParquetPrimitiveConverter(updater) {
+          override def addInt(value: Int): Unit = {
+            updater.set(dateRebaseFunc(value))
           }
         }
 
@@ -386,7 +367,12 @@ private[parquet] class ParquetRowConverter(
           }
         }
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, convertTz, rebaseDateTime, wrappedUpdater)
+          schemaConverter,
+          parquetType.asGroupType(),
+          t,
+          convertTz,
+          datetimeRebaseMode,
+          wrappedUpdater)
 
       case t =>
         throw new RuntimeException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e367b9c..4e535c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -35,8 +35,9 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.util.RebaseDateTime._
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 /**
@@ -78,9 +79,14 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
   private val decimalBuffer =
     new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
 
-  // Whether to rebase datetimes from Gregorian to Julian calendar in write
-  private val rebaseDateTime: Boolean =
-    SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE)
+  private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
+    SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE))
+
+  private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
+    datetimeRebaseMode, "Parquet")
+
+  private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
+    datetimeRebaseMode, "Parquet")
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -103,7 +109,13 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
     val metadata = Map(
       SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
       ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
-    ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None)
+    ) ++ {
+      if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) {
+        Some(SPARK_LEGACY_DATETIME -> "")
+      } else {
+        None
+      }
+    }
 
     logInfo(
       s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -152,12 +164,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getShort(ordinal))
 
-      case DateType if rebaseDateTime =>
+      case DateType =>
         (row: SpecializedGetters, ordinal: Int) =>
-          val rebasedDays = rebaseGregorianToJulianDays(row.getInt(ordinal))
-          recordConsumer.addInteger(rebasedDays)
+          recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal)))
 
-      case IntegerType | DateType =>
+      case IntegerType =>
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addInteger(row.getInt(ordinal))
 
@@ -187,24 +198,15 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
               buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
               recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
 
-          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime =>
-            (row: SpecializedGetters, ordinal: Int) =>
-              val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal))
-              recordConsumer.addLong(rebasedMicros)
-
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
             (row: SpecializedGetters, ordinal: Int) =>
-              recordConsumer.addLong(row.getLong(ordinal))
-
-          case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime =>
-            (row: SpecializedGetters, ordinal: Int) =>
-              val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal))
-              val millis = DateTimeUtils.toMillis(rebasedMicros)
-              recordConsumer.addLong(millis)
+              val micros = row.getLong(ordinal)
+              recordConsumer.addLong(timestampRebaseFunc(micros))
 
           case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
             (row: SpecializedGetters, ordinal: Int) =>
-              val millis = DateTimeUtils.toMillis(row.getLong(ordinal))
+              val micros = row.getLong(ordinal)
+              val millis = DateTimeUtils.toMillis(timestampRebaseFunc(micros))
               recordConsumer.addLong(millis)
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 1925fa1..3b482b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF
 import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{AtomicType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -116,8 +117,9 @@ case class ParquetPartitionReaderFactory(
   private def buildReaderBase[T](
       file: PartitionedFile,
       buildReaderFunc: (
-        ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate],
-          Option[ZoneId], Boolean) => RecordReader[Void, T]): RecordReader[Void, T] = {
+        ParquetInputSplit, InternalRow, TaskAttemptContextImpl,
+          Option[FilterPredicate], Option[ZoneId],
+          LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
     val filePath = new Path(new URI(file.filePath))
@@ -169,12 +171,11 @@ case class ParquetPartitionReaderFactory(
     if (pushed.isDefined) {
       ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
     }
-    val rebaseDatetime = DataSourceUtils.needRebaseDateTime(
-      footerFileMetaData.getKeyValueMetaData.get).getOrElse {
-      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
-    }
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
     val reader = buildReaderFunc(
-      split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, rebaseDatetime)
+      split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode)
     reader.initialize(split, hadoopAttemptContext)
     reader
   }
@@ -189,12 +190,12 @@ case class ParquetPartitionReaderFactory(
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
       convertTz: Option[ZoneId],
-      needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = {
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, InternalRow] = {
     logDebug(s"Falling back to parquet-mr")
     val taskContext = Option(TaskContext.get())
     // ParquetRecordReader returns InternalRow
     val readSupport = new ParquetReadSupport(
-      convertTz, enableVectorizedReader = false, needDateTimeRebase)
+      convertTz, enableVectorizedReader = false, datetimeRebaseMode)
     val reader = if (pushed.isDefined && enableRecordFilter) {
       val parquetFilter = FilterCompat.get(pushed.get, null)
       new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -220,11 +221,11 @@ case class ParquetPartitionReaderFactory(
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
       convertTz: Option[ZoneId],
-      rebaseDatetime: Boolean): VectorizedParquetRecordReader = {
+      datetimeRebaseMode: LegacyBehaviorPolicy.Value): VectorizedParquetRecordReader = {
     val taskContext = Option(TaskContext.get())
     val vectorizedReader = new VectorizedParquetRecordReader(
       convertTz.orNull,
-      rebaseDatetime,
+      datetimeRebaseMode.toString,
       enableOffHeapColumnVector && taskContext.isDefined,
       capacity)
     val iter = new RecordReaderIterator(vectorizedReader)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
index aa47d36..d6167f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
 
 object DateTime extends Enumeration {
@@ -161,9 +162,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
               Seq(true, false).foreach { modernDates =>
                 Seq(false, true).foreach { rebase =>
                   benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ =>
+                    val mode = if (rebase) LEGACY else CORRECTED
                     withSQLConf(
                       SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> getOutputType(dateTime),
-                      SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) {
+                      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) {
                       genDF(rowsNum, dateTime, modernDates)
                         .write
                         .mode("overwrite")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index cf2c7c8..87b4db3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -41,7 +41,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -892,41 +893,67 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         val path3_0_rebase = paths(1).getCanonicalPath
         if (dt == "date") {
           val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
-          df.write.parquet(path3_0)
-          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+
+          // By default we should fail to write ancient datetime values.
+          var e = intercept[SparkException](df.write.parquet(path3_0))
+          assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+          // By default we should fail to read ancient datetime values.
+          e = intercept[SparkException](spark.read.parquet(path2_4).collect())
+          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
+            df.write.mode("overwrite").parquet(path3_0)
+          }
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
             df.write.parquet(path3_0_rebase)
           }
-          checkAnswer(
-            spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+
+          // For Parquet files written by Spark 3.0, we know the writer info and don't need the
+          // config to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+          }
         } else {
           val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
           withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
-            df.write.parquet(path3_0)
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+            // By default we should fail to write ancient datetime values.
+            var e = intercept[SparkException](df.write.parquet(path3_0))
+            assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
+            // By default we should fail to read ancient datetime values.
+            e = intercept[SparkException](spark.read.parquet(path2_4).collect())
+            assert(e.getCause.isInstanceOf[SparkUpgradeException])
+
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
+              df.write.mode("overwrite").parquet(path3_0)
+            }
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
               df.write.parquet(path3_0_rebase)
             }
           }
-          checkAnswer(
-            spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
-            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          // For Parquet files written by Spark 3.0, we know the writer info and don't need the
+          // config to guide the rebase behavior.
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) {
+            checkAnswer(
+              spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
+              1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+          }
         }
       }
     }
 
     Seq(false, true).foreach { vectorized =>
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
-        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
-          checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
-          checkReadMixedFiles(
-            "before_1582_timestamp_micros_v2_4.snappy.parquet",
-            "TIMESTAMP_MICROS",
-            "1001-01-01 01:02:03.123456")
-          checkReadMixedFiles(
-            "before_1582_timestamp_millis_v2_4.snappy.parquet",
-            "TIMESTAMP_MILLIS",
-            "1001-01-01 01:02:03.123")
-        }
+        checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
+        checkReadMixedFiles(
+          "before_1582_timestamp_micros_v2_4.snappy.parquet",
+          "TIMESTAMP_MICROS",
+          "1001-01-01 01:02:03.123456")
+        checkReadMixedFiles(
+          "before_1582_timestamp_millis_v2_4.snappy.parquet",
+          "TIMESTAMP_MILLIS",
+          "1001-01-01 01:02:03.123")
 
         // INT96 is a legacy timestamp format and we always rebase the seconds for it.
         checkAnswer(readResourceParquetFile(
@@ -948,7 +975,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
           withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
             withTempPath { dir =>
               val path = dir.getAbsolutePath
-              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
                 Seq.tabulate(N)(_ => tsStr).toDF("tsS")
                   .select($"tsS".cast("timestamp").as("ts"))
                   .repartition(1)
@@ -960,10 +987,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
               Seq(false, true).foreach { vectorized =>
                 withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
                   // The file metadata indicates if it needs rebase or not, so we can always get the
-                  // correct result regardless of the "rebaseInRead" config.
-                  Seq(true, false).foreach { rebase =>
+                  // correct result regardless of the "rebase mode" config.
+                  Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
                     withSQLConf(
-                      SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+                      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) {
                       checkAnswer(
                         spark.read.parquet(path),
                         Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
@@ -991,7 +1018,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
     Seq(false, true).foreach { dictionaryEncoding =>
       withTempPath { dir =>
         val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+        withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
           Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS")
             .select($"dateS".cast("date").as("date"))
             .repartition(1)
@@ -1002,10 +1029,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
 
         Seq(false, true).foreach { vectorized =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
-            // The file metadata indicates if it needs rebase or not, so we can always get
-            // the correct result regardless of the "rebaseInRead" config.
-            Seq(true, false).foreach { rebase =>
-              withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+            // The file metadata indicates if it needs rebase or not, so we can always get the
+            // correct result regardless of the "rebase mode" config.
+            Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+              withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
                 checkAnswer(
                   spark.read.parquet(path),
                   Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 42b6862..cbea741 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 
@@ -151,7 +152,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
               Seq(false)
             }
             java8ApiConfValues.foreach { java8Api =>
-              withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+              withSQLConf(
+                SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+                SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString,
+                SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
                 val dataGenerator = RandomDataGenerator.forType(
                   dataType = dataType,
                   nullable = true,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org