You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/22 06:42:03 UTC

[hudi] 05/05: [HUDI-3934] Fix `Spark32HoodieParquetFileFormat` not being compatible w/ Spark 3.2.0 (#5378)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 25501c99e9e353bb8cf3757404cc0cd1835e03b3
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Apr 21 18:00:38 2022 -0700

    [HUDI-3934] Fix `Spark32HoodieParquetFileFormat` not being compatible w/ Spark 3.2.0 (#5378)
    
    - Due to the fact that Spark 3.2.1 is non-BWC w/ 3.2.0, we have to handle all these incompatibilities in Spark32HoodieParquetFileFormat. This PR is addressing that.
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |   8 +-
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   4 +-
 ....scala => Spark31HoodieParquetFileFormat.scala} |  31 ++--
 .../parquet/Spark32DataSourceUtils.scala           |  77 ++++++++++
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 157 +++++++++++++++++----
 5 files changed, 229 insertions(+), 48 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 54bc06bd76..7a8f8a1580 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
 
   def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
 
+  def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
+
+  def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+
   def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
 
   def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
 
-  def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
-
-  def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
+  def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
 
   def getMetaSchema: StructType = {
     StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index cd5cd9c82f..22431cb257 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
@@ -55,6 +55,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
   }
 
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
-    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
+    Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
similarity index 95%
rename from hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
rename to hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 769373866f..e99850bef0 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
+import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
+import org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -61,7 +61,7 @@ import java.net.URI
  *   <li>Schema on-read</li>
  * </ol>
  */
-class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
 
   override def buildReaderWithPartitionValues(sparkSession: SparkSession,
                                               dataSchema: StructType,
@@ -154,8 +154,8 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
       val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
 
       val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
       val fileSchema = if (shouldUseInternalSchema) {
+        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
       } else {
@@ -223,13 +223,17 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
 
       // Clone new conf
       val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+
+        SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
@@ -329,9 +333,7 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B
   }
 }
 
-object Spark312HoodieParquetFileFormat {
-
-  val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
+object Spark31HoodieParquetFileFormat {
 
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
@@ -343,10 +345,11 @@ object Spark312HoodieParquetFileFormat {
     }
   }
 
-  private def createParquetFilters(arg: Any*): ParquetFilters = {
-    val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
-    val ctor = clazz.getConstructors.head
-    ctor.newInstance(arg.map(_.asInstanceOf[AnyRef]): _*).asInstanceOf[ParquetFilters]
+  private def createParquetFilters(args: Any*): ParquetFilters = {
+    // ParquetFilters bears a single ctor (in Spark 3.1)
+    val ctor = classOf[ParquetFilters].getConstructors.head
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
   }
 
   private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
new file mode 100644
index 0000000000..6d1c76380f
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.util.Utils
+
+object Spark32DataSourceUtils {
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def int96RebaseMode(lookupFileMeta: String => String,
+                      modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the INT96 timestamp values.
+      // Files written by Spark 3.1 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+  /**
+   * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime
+   * compatibility against Spark 3.2.0
+   */
+  // scalastyle:off
+  def datetimeRebaseMode(lookupFileMeta: String => String,
+                         modeByConfig: String): LegacyBehaviorPolicy.Value = {
+    if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
+      return LegacyBehaviorPolicy.CORRECTED
+    }
+    // If there is no version, we return the mode specified by the config.
+    Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
+      // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
+      // rebase the datetime values.
+      // Files written by Spark 3.0 and latter may also need the rebase if they were written with
+      // the "LEGACY" rebase mode.
+      if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
+        LegacyBehaviorPolicy.LEGACY
+      } else {
+        LegacyBehaviorPolicy.CORRECTED
+      }
+    }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
+  }
+  // scalastyle:on
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index f2a0a21df8..7135f19e95 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
@@ -37,10 +38,10 @@ import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -148,8 +149,8 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
 
       val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
       val fileSchema = if (shouldUseInternalSchema) {
+        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
       } else {
@@ -158,21 +159,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
 
       lazy val footerFileMetaData =
         ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-      val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
-        footerFileMetaData.getKeyValueMetaData.get,
-        datetimeRebaseModeInRead)
       // Try to push down filters when filter push-down is enabled.
       val pushed = if (enableParquetFilterPushDown) {
         val parquetSchema = footerFileMetaData.getSchema
-        val parquetFilters = new ParquetFilters(
-          parquetSchema,
-          pushDownDate,
-          pushDownTimestamp,
-          pushDownDecimal,
-          pushDownStringStartWith,
-          pushDownInFilterThreshold,
-          isCaseSensitive,
-          datetimeRebaseSpec)
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) {
+          // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+          //       and unfortunately won't compile against Spark 3.2.0
+          //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+          val datetimeRebaseSpec =
+            DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          new ParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseSpec)
+        } else {
+          // Spark 3.2.0
+          val datetimeRebaseMode =
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
+        }
         filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -198,21 +216,21 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
           None
         }
 
-      val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
-        footerFileMetaData.getKeyValueMetaData.get,
-        int96RebaseModeInRead)
-
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
 
       // Clone new conf
       val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+
+        SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 
@@ -225,6 +243,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       if (enableVectorizedReader) {
         val vectorizedReader =
           if (shouldUseInternalSchema) {
+            val int96RebaseSpec =
+              DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            val datetimeRebaseSpec =
+              DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
             new Spark32HoodieVectorizedParquetRecordReader(
               convertTz.orNull,
               datetimeRebaseSpec.mode.toString,
@@ -234,7 +256,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
               enableOffHeapColumnVector && taskContext.isDefined,
               capacity,
               typeChangeInfos)
-          } else {
+          } else if (HoodieSparkUtils.gteqSpark3_2_1) {
+            // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+            //       and unfortunately won't compile against Spark 3.2.0
+            //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+            val int96RebaseSpec =
+              DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            val datetimeRebaseSpec =
+              DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
             new VectorizedParquetRecordReader(
               convertTz.orNull,
               datetimeRebaseSpec.mode.toString,
@@ -243,7 +272,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
               int96RebaseSpec.timeZone,
               enableOffHeapColumnVector && taskContext.isDefined,
               capacity)
+          } else {
+            // Spark 3.2.0
+            val datetimeRebaseMode =
+              Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+            val int96RebaseMode =
+              Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+            createVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
+
         // SPARK-37089: We cannot register a task completion listener to close this iterator here
         // because downstream exec nodes have already registered their listeners. Since listeners
         // are executed in reverse order of registration, a listener registered here would close the
@@ -279,12 +321,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
-        // ParquetRecordReader returns InternalRow
-        val readSupport = new ParquetReadSupport(
-          convertTz,
-          enableVectorizedReader = false,
-          datetimeRebaseSpec,
-          int96RebaseSpec)
+        val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) {
+          // ParquetRecordReader returns InternalRow
+          // NOTE: Below code could only be compiled against >= Spark 3.2.1,
+          //       and unfortunately won't compile against Spark 3.2.0
+          //       However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
+          val int96RebaseSpec =
+            DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+          val datetimeRebaseSpec =
+            DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          new ParquetReadSupport(
+            convertTz,
+            enableVectorizedReader = false,
+            datetimeRebaseSpec,
+            int96RebaseSpec)
+        } else {
+          val datetimeRebaseMode =
+            Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
+          val int96RebaseMode =
+            Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+          createParquetReadSupport(
+            convertTz,
+            /* enableVectorizedReader = */ false,
+            datetimeRebaseMode,
+            int96RebaseMode)
+        }
+
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
           new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -332,10 +394,47 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
       }
     }
   }
+
 }
 
 object Spark32HoodieParquetFileFormat {
 
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createParquetFilters(args: Any*): ParquetFilters = {
+    // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[ParquetFilters].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetFilters]
+  }
+
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
+    // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[ParquetReadSupport].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[ParquetReadSupport]
+  }
+
+  /**
+   * NOTE: This method is specific to Spark 3.2.0
+   */
+  private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
+    // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it
+    //       up by arg types, and have to instead rely on the number of args based on individual class;
+    //       the ctor order is not guaranteed
+    val ctor = classOf[VectorizedParquetRecordReader].getConstructors.maxBy(_.getParameterCount)
+    ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*)
+      .asInstanceOf[VectorizedParquetRecordReader]
+  }
+
   def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
     val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
     if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {