You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/21 15:27:36 UTC

[spark] branch master updated: [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ebbac  [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata
a5ebbac is described below

commit a5ebbacf538cd78f3edc81542a8514c60d340109
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Apr 22 00:26:23 2020 +0900

    [SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new parquet/avro file metadata: `org.apache.spark.legacyDatetime`. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it.
    
    This makes Spark be able to do rebase more smartly:
    1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true.
    2. If the file was written by Spark 2.4 and earlier, then do rebase.
    3. If the file was written by Spark 3.0 and later, do rebase if the `org.apache.spark.legacyDatetime` exists in file metadata.
    
    ### Why are the changes needed?
    
    It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately.
    
    To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Updated test
    
    Closes #28137 from cloud-fan/datetime.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  12 ++-
 .../org/apache/spark/sql/avro/AvroFileFormat.scala |  11 ++-
 .../apache/spark/sql/avro/AvroOutputWriter.scala   |  17 +++-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  14 ++-
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   |  10 +-
 .../sql/avro/AvroCatalystDataConversionSuite.scala |   2 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 103 ++++++++++++++++-----
 .../parquet/VectorizedColumnReader.java            |   6 +-
 .../parquet/VectorizedParquetRecordReader.java     |  18 +++-
 .../execution/datasources/DataSourceUtils.scala    |  17 ++++
 .../datasources/parquet/ParquetFileFormat.scala    |  13 ++-
 .../datasources/parquet/ParquetReadSupport.scala   |  15 +--
 .../parquet/ParquetRecordMaterializer.scala        |  10 +-
 .../datasources/parquet/ParquetRowConverter.scala  |  10 +-
 .../datasources/parquet/ParquetWriteSupport.scala  |   6 +-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  26 ++++--
 .../main/scala/org/apache/spark/sql/package.scala  |   6 ++
 .../benchmark/DataSourceReadBenchmark.scala        |   6 +-
 .../datasources/parquet/ParquetEncodingSuite.scala |   6 +-
 .../datasources/parquet/ParquetIOSuite.scala       |  89 ++++++++++++++----
 .../datasources/parquet/ParquetTest.scala          |   7 +-
 21 files changed, 299 insertions(+), 105 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 8d78cf4..f32fe46 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -41,12 +41,14 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * A deserializer to deserialize data in avro format to data in catalyst format.
  */
-class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
-  private lazy val decimalConversions = new DecimalConversion()
+class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
+
+  def this(rootAvroType: Schema, rootCatalystType: DataType) {
+    this(rootAvroType, rootCatalystType,
+      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
+  }
 
-  // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
-  private val rebaseDateTime =
-    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+  private lazy val decimalConversions = new DecimalConversion()
 
   private val converter: Any => Any = rootCatalystType match {
     // A shortcut for empty schema.
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 123669b..e69c95b 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -34,7 +34,8 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
@@ -123,8 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
         reader.sync(file.start)
         val stop = file.start + file.length
 
-        val deserializer =
-          new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
+        val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+          reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
+          SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+        }
+        val deserializer = new AvroDeserializer(
+          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
 
         new Iterator[InternalRow] {
           private[this] var completed = false
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 2cfa3a4..82a5680 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -29,9 +29,10 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
 
 import org.apache.spark.SPARK_VERSION_SHORT
-import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
@@ -41,16 +42,24 @@ private[avro] class AvroOutputWriter(
     schema: StructType,
     avroSchema: Schema) extends OutputWriter {
 
+  // Whether to rebase datetimes from Gregorian to Julian calendar in write
+  private val rebaseDateTime: Boolean =
+    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+
   // The input rows will never be null.
-  private lazy val serializer = new AvroSerializer(schema, avroSchema, nullable = false)
+  private lazy val serializer =
+    new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
 
   /**
    * Overrides the couple of methods responsible for generating the output streams / files so
    * that the data can be correctly partitioned
    */
   private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
-    val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava
-    new SparkAvroKeyOutputFormat(sparkVersion) {
+    val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
+      if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
+    }
+
+    new SparkAvroKeyOutputFormat(fileMeta.asJava) {
 
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         new Path(path)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index daa9c7d..c87249e 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -42,12 +42,16 @@ import org.apache.spark.sql.types._
 /**
  * A serializer to serialize data in catalyst format to data in avro format.
  */
-class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
-  extends Logging {
+class AvroSerializer(
+    rootCatalystType: DataType,
+    rootAvroType: Schema,
+    nullable: Boolean,
+    rebaseDateTime: Boolean) extends Logging {
 
-  // Whether to rebase datetimes from Gregorian to Julian calendar in write
-  private val rebaseDateTime: Boolean =
-    SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
+  def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
+    this(rootCatalystType, rootAvroType, nullable,
+      SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
+  }
 
   def serialize(catalystData: Any): Any = {
     converter.apply(catalystData)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 8230dba..712aec6 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.PartitionReader
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile}
 import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -88,8 +88,12 @@ case class AvroPartitionReaderFactory(
       reader.sync(partitionedFile.start)
       val stop = partitionedFile.start + partitionedFile.length
 
-      val deserializer =
-        new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), readDataSchema)
+      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+        reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
+        SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
+      }
+      val deserializer = new AvroDeserializer(
+        userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
 
       val fileReader = new PartitionReader[InternalRow] {
         private[this] var completed = false
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index c8a1f67..64d790b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
       """.stripMargin
     val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType
-    val deserializer = new AvroDeserializer(avroSchema, dataType)
+    val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
 
     def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
       assert(checkResult(
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a5224fd1..3e754f0 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -84,9 +84,8 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
-  private def readResourceAvroFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    spark.read.format("avro").load(url.toString)
+  private def getResourceAvroFilePath(name: String): String = {
+    Thread.currentThread().getContextClassLoader.getResource(name).toString
   }
 
   test("resolve avro data source") {
@@ -1530,16 +1529,50 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
+    // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
+    def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
+      withTempPaths(2) { paths =>
+        paths.foreach(_.delete())
+        val path2_4 = getResourceAvroFilePath(fileName)
+        val path3_0 = paths(0).getCanonicalPath
+        val path3_0_rebase = paths(1).getCanonicalPath
+        if (dt == "date") {
+          val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+          df.write.format("avro").save(path3_0)
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+            df.write.format("avro").save(path3_0_rebase)
+          }
+          checkAnswer(
+            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+        } else {
+          val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+          val avroSchema =
+            s"""
+              |{
+              |  "type" : "record",
+              |  "name" : "test_schema",
+              |  "fields" : [
+              |    {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
+              |  ]
+              |}""".stripMargin
+          df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
+            df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase)
+          }
+          checkAnswer(
+            spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
+            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+        }
+      }
+    }
+
     withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-      checkAnswer(
-        readResourceAvroFile("before_1582_date_v2_4.avro"),
-        Row(java.sql.Date.valueOf("1001-01-01")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+      checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
+      checkReadMixedFiles(
+        "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
+      checkReadMixedFiles(
+        "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
     }
   }
 
@@ -1554,10 +1587,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
           .write.format("avro")
           .save(path)
       }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
+
+      // The file metadata indicates if it needs rebase or not, so we can always get the correct
+      // result regardless of the "rebaseInRead" config.
+      Seq(true, false).foreach { rebase =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+          checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
+        }
       }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
+
+      // Force to not rebase to prove the written datetime values are rebased and we will get
+      // wrong result if we don't rebase while reading.
+      withSQLConf("spark.test.forceNoRebase" -> "true") {
         checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
       }
     }
@@ -1589,12 +1630,20 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
             .format("avro")
             .save(path)
         }
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(rebased)))
+
+        // The file metadata indicates if it needs rebase or not, so we can always get the correct
+        // result regardless of the "rebaseInRead" config.
+        Seq(true, false).foreach { rebase =>
+          withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+            checkAnswer(
+              spark.read.schema("ts timestamp").format("avro").load(path),
+              Row(Timestamp.valueOf(rebased)))
+          }
         }
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
+
+        // Force to not rebase to prove the written datetime values are rebased and we will get
+        // wrong result if we don't rebase while reading.
+        withSQLConf("spark.test.forceNoRebase" -> "true") {
           checkAnswer(
             spark.read.schema("ts timestamp").format("avro").load(path),
             Row(Timestamp.valueOf(nonRebased)))
@@ -1612,10 +1661,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
           .write.format("avro")
           .save(path)
       }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
+
+      // The file metadata indicates if it needs rebase or not, so we can always get the correct
+      // result regardless of the "rebaseInRead" config.
+      Seq(true, false).foreach { rebase =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+          checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
+        }
       }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
+
+      // Force to not rebase to prove the written datetime values are rebased and we will get
+      // wrong result if we don't rebase while reading.
+      withSQLConf("spark.test.forceNoRebase" -> "true") {
         checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
       }
     }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index c50619a..cfb873f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -109,7 +108,8 @@ public class VectorizedColumnReader {
       ColumnDescriptor descriptor,
       OriginalType originalType,
       PageReader pageReader,
-      ZoneId convertTz) throws IOException {
+      ZoneId convertTz,
+      boolean rebaseDateTime) throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
     this.convertTz = convertTz;
@@ -132,7 +132,7 @@ public class VectorizedColumnReader {
     if (totalValueCount == 0) {
       throw new IOException("totalValueCount == 0");
     }
-    this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeInReadEnabled();
+    this.rebaseDateTime = rebaseDateTime;
   }
 
   /**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 7306709..c9590b9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -86,7 +86,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    * The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
    * workaround incompatibilities between different engines when writing timestamp values.
    */
-  private ZoneId convertTz = null;
+  private final ZoneId convertTz;
+
+  /**
+   * true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar.
+   */
+  private final boolean rebaseDateTime;
 
   /**
    * columnBatch object that is used for batch decoding. This is created on first use and triggers
@@ -116,12 +121,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private final MemoryMode MEMORY_MODE;
 
-  public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) {
+  public VectorizedParquetRecordReader(
+    ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) {
     this.convertTz = convertTz;
+    this.rebaseDateTime = rebaseDateTime;
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
     this.capacity = capacity;
   }
 
+  // For test only.
+  public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
+    this(null, false, useOffHeap, capacity);
+  }
+
   /**
    * Implementation of RecordReader API.
    */
@@ -309,7 +321,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
-        pages.getPageReader(columns.get(i)), convertTz);
+        pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index bd56635..b19de6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -21,8 +21,11 @@ import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
 object DataSourceUtils {
@@ -64,4 +67,18 @@ object DataSourceUtils {
 
   private[sql] def isDataFile(fileName: String) =
     !(fileName.startsWith("_") || fileName.startsWith("."))
+
+  def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return Some(false)
+    }
+    // If there is no version, we return None and let the caller side to decide.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the datetime values.
+      // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+      // the "rebaseInWrite" config enabled.
+      version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29dbd8d..c6d9ddf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -300,6 +300,11 @@ class ParquetFileFormat
           None
         }
 
+      val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
+        footerFileMetaData.getKeyValueMetaData.get).getOrElse {
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
+      }
+
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
@@ -312,7 +317,10 @@ class ParquetFileFormat
       val taskContext = Option(TaskContext.get())
       if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
-          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
+          convertTz.orNull,
+          rebaseDateTime,
+          enableOffHeapColumnVector && taskContext.isDefined,
+          capacity)
         val iter = new RecordReaderIterator(vectorizedReader)
         // SPARK-23457 Register a task completion listener before `initialization`.
         taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
@@ -328,7 +336,8 @@ class ParquetFileFormat
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns InternalRow
-        val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
+        val readSupport = new ParquetReadSupport(
+          convertTz, enableVectorizedReader = false, rebaseDateTime)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
           new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index c05ecf1..28165e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -50,16 +50,18 @@ import org.apache.spark.sql.types._
  * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
  */
-class ParquetReadSupport(val convertTz: Option[ZoneId],
-    enableVectorizedReader: Boolean)
+class ParquetReadSupport(
+    val convertTz: Option[ZoneId],
+    enableVectorizedReader: Boolean,
+    rebaseDateTime: Boolean)
   extends ReadSupport[InternalRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
   def this() {
     // We need a zero-arg constructor for SpecificParquetRecordReaderBase.  But that is only
-    // used in the vectorized reader, where we get the convertTz value directly, and the value here
-    // is ignored.
-    this(None, enableVectorizedReader = true)
+    // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly,
+    // and the values here are ignored.
+    this(None, enableVectorizedReader = true, rebaseDateTime = false)
   }
 
   /**
@@ -127,7 +129,8 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
       new ParquetToSparkSchemaConverter(conf),
-      convertTz)
+      convertTz,
+      rebaseDateTime)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 5622169..ec03713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -31,16 +31,20 @@ import org.apache.spark.sql.types.StructType
  * @param parquetSchema Parquet schema of the records to be read
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
+ * @param convertTz the optional time zone to convert to int96 data
+ * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
+ *                       calendar
  */
 private[parquet] class ParquetRecordMaterializer(
     parquetSchema: MessageType,
     catalystSchema: StructType,
     schemaConverter: ParquetToSparkSchemaConverter,
-    convertTz: Option[ZoneId])
+    convertTz: Option[ZoneId],
+    rebaseDateTime: Boolean)
   extends RecordMaterializer[InternalRow] {
 
-  private val rootConverter =
-    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
+  private val rootConverter = new ParquetRowConverter(
+    schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, NoopUpdater)
 
   override def getCurrentRecord: InternalRow = rootConverter.currentRecord
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 6072db1..8376b7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -120,7 +120,9 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
  *        types should have been expanded.
- * @param convertTz the optional time zone to convert to for int96 data
+ * @param convertTz the optional time zone to convert to int96 data
+ * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian
+ *                       calendar
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class ParquetRowConverter(
@@ -128,12 +130,10 @@ private[parquet] class ParquetRowConverter(
     parquetType: GroupType,
     catalystType: StructType,
     convertTz: Option[ZoneId],
+    rebaseDateTime: Boolean,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
-  // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
-  private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeInReadEnabled
-
   assert(
     parquetType.getFieldCount <= catalystType.length,
     s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
@@ -386,7 +386,7 @@ private[parquet] class ParquetRowConverter(
           }
         }
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
+          schemaConverter, parquetType.asGroupType(), t, convertTz, rebaseDateTime, wrappedUpdater)
 
       case t =>
         throw new RuntimeException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 7317a25..b135611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
 
 import org.apache.spark.SPARK_VERSION_SHORT
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -103,7 +103,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
     val metadata = Map(
       SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
       ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
-    ).asJava
+    ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None)
 
     logInfo(
       s"""Initialized Parquet WriteSupport with Catalyst schema:
@@ -112,7 +112,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
          |$messageType
        """.stripMargin)
 
-    new WriteContext(messageType, metadata)
+    new WriteContext(messageType, metadata.asJava)
   }
 
   override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 047bc74..1925fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
-import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
@@ -117,7 +117,7 @@ case class ParquetPartitionReaderFactory(
       file: PartitionedFile,
       buildReaderFunc: (
         ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate],
-          Option[ZoneId]) => RecordReader[Void, T]): RecordReader[Void, T] = {
+          Option[ZoneId], Boolean) => RecordReader[Void, T]): RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
     val filePath = new Path(new URI(file.filePath))
@@ -169,8 +169,12 @@ case class ParquetPartitionReaderFactory(
     if (pushed.isDefined) {
       ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
     }
-    val reader =
-      buildReaderFunc(split, file.partitionValues, hadoopAttemptContext, pushed, convertTz)
+    val rebaseDatetime = DataSourceUtils.needRebaseDateTime(
+      footerFileMetaData.getKeyValueMetaData.get).getOrElse {
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ)
+    }
+    val reader = buildReaderFunc(
+      split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, rebaseDatetime)
     reader.initialize(split, hadoopAttemptContext)
     reader
   }
@@ -184,11 +188,13 @@ case class ParquetPartitionReaderFactory(
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
-      convertTz: Option[ZoneId]): RecordReader[Void, InternalRow] = {
+      convertTz: Option[ZoneId],
+      needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = {
     logDebug(s"Falling back to parquet-mr")
     val taskContext = Option(TaskContext.get())
     // ParquetRecordReader returns InternalRow
-    val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false)
+    val readSupport = new ParquetReadSupport(
+      convertTz, enableVectorizedReader = false, needDateTimeRebase)
     val reader = if (pushed.isDefined && enableRecordFilter) {
       val parquetFilter = FilterCompat.get(pushed.get, null)
       new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -213,10 +219,14 @@ case class ParquetPartitionReaderFactory(
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
-      convertTz: Option[ZoneId]): VectorizedParquetRecordReader = {
+      convertTz: Option[ZoneId],
+      rebaseDatetime: Boolean): VectorizedParquetRecordReader = {
     val taskContext = Option(TaskContext.get())
     val vectorizedReader = new VectorizedParquetRecordReader(
-      convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
+      convertTz.orNull,
+      rebaseDatetime,
+      enableOffHeapColumnVector && taskContext.isDefined,
+      capacity)
     val iter = new RecordReaderIterator(vectorizedReader)
     // SPARK-23457 Register a task completion listener before `initialization`.
     taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 58de675..c039701 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -54,4 +54,10 @@ package object sql {
    * Note that Hive table property `spark.sql.create.version` also has Spark version.
    */
   private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
+
+  /**
+   * Parquet/Avro file metadata key to indicate that the file was written with legacy datetime
+   * values.
+   */
+  private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index a084bec..d29c5e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -169,7 +169,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
 
           files.map(_.asInstanceOf[String]).foreach { p =>
             val reader = new VectorizedParquetRecordReader(
-              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+              enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -203,7 +203,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
 
           files.map(_.asInstanceOf[String]).foreach { p =>
             val reader = new VectorizedParquetRecordReader(
-              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+              enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -458,7 +458,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
           var sum = 0
           files.map(_.asInstanceOf[String]).foreach { p =>
             val reader = new VectorizedParquetRecordReader(
-              null, enableOffHeapColumnVector, vectorizedReaderBatchSize)
+              enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 6d681af..fbfedf0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -42,7 +42,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -69,7 +69,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -100,7 +100,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file, null /* set columns to null to project all columns */)
         val column = reader.resultBatch().column(0)
         assert(reader.nextBatch())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index f901ce1..239db7d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.sql.{Date, Timestamp}
+import java.time._
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -720,7 +721,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       {
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, null)
           val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -739,7 +740,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       {
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, ("_2" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String)]
@@ -757,7 +758,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       {
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -776,7 +777,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       {
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         try {
           reader.initialize(file, List[String]().asJava)
           var result = 0
@@ -817,7 +818,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         val schema = StructType(StructField("pcol", dt) :: Nil)
         val conf = sqlContext.conf
         val vectorizedReader = new VectorizedParquetRecordReader(
-          null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
+          conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         val partitionValues = new GenericInternalRow(Array(v))
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
 
@@ -882,19 +883,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
+    // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
+    def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
+      withTempPaths(2) { paths =>
+        paths.foreach(_.delete())
+        val path2_4 = getResourceParquetFilePath("test-data/" + fileName)
+        val path3_0 = paths(0).getCanonicalPath
+        val path3_0_rebase = paths(1).getCanonicalPath
+        if (dt == "date") {
+          val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+          df.write.parquet(path3_0)
+          withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+            df.write.parquet(path3_0_rebase)
+          }
+          checkAnswer(
+            spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
+            1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
+        } else {
+          val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+          withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) {
+            df.write.parquet(path3_0)
+            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
+              df.write.parquet(path3_0_rebase)
+            }
+          }
+          checkAnswer(
+            spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
+            1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
+        }
+      }
+    }
+
     Seq(false, true).foreach { vectorized =>
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
         withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
-          checkAnswer(
-            readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
-            Row(java.sql.Date.valueOf("1001-01-01")))
-          checkAnswer(readResourceParquetFile(
-            "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
-            Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-          checkAnswer(readResourceParquetFile(
-            "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
-            Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
+          checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01")
+          checkReadMixedFiles(
+            "before_1582_timestamp_micros_v2_4.snappy.parquet",
+            "TIMESTAMP_MICROS",
+            "1001-01-01 01:02:03.123456")
+          checkReadMixedFiles(
+            "before_1582_timestamp_millis_v2_4.snappy.parquet",
+            "TIMESTAMP_MILLIS",
+            "1001-01-01 01:02:03.123")
         }
+
+        // INT96 is a legacy timestamp format and we always rebase the seconds for it.
         checkAnswer(readResourceParquetFile(
           "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
           Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
@@ -918,10 +952,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
                 .write
                 .parquet(path)
             }
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
-              checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+            // The file metadata indicates if it needs rebase or not, so we can always get the
+            // correct result regardless of the "rebaseInRead" config.
+            Seq(true, false).foreach { rebase =>
+              withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+                checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
+              }
             }
-            withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "false") {
+
+            // Force to not rebase to prove the written datetime values are rebased and we will get
+            // wrong result if we don't rebase while reading.
+            withSQLConf("spark.test.forceNoRebase" -> "true") {
               checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
             }
           }
@@ -939,10 +980,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
           .write
           .parquet(path)
       }
-      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") {
-        checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+
+      // The file metadata indicates if it needs rebase or not, so we can always get the correct
+      // result regardless of the "rebaseInRead" config.
+      Seq(true, false).foreach { rebase =>
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
+          checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
+        }
       }
-      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "false") {
+
+      // Force to not rebase to prove the written datetime values are rebased and we will get
+      // wrong result if we don't rebase while reading.
+      withSQLConf("spark.test.forceNoRebase" -> "true") {
         checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index f2dbc53..c833d5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -156,7 +156,10 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
   }
 
   protected def readResourceParquetFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    spark.read.parquet(url.toString)
+    spark.read.parquet(getResourceParquetFilePath(name))
+  }
+
+  protected def getResourceParquetFilePath(name: String): String = {
+    Thread.currentThread().getContextClassLoader.getResource(name).toString
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org