You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/20 11:30:34 UTC

[hudi] branch master updated: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f7544e23ac [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)
f7544e23ac is described below

commit f7544e23ac6899b2d8b28a01c79caad1facd1379
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 04:30:27 2022 -0700

    [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)
    
     - Scaffolded `Spark24HoodieParquetFileFormat` extending `ParquetFileFormat` and overriding the behavior of adding partition columns to every row
     - Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able to configure whether to append partition values or not
     - Fallback to append partition values in cases when the source columns are not persisted in data-file
     - Fixing HoodieBaseRelation incorrectly handling mandatory columns
---
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  20 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 112 +++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  80 +---
 .../hudi/common/model/HoodiePartitionMetadata.java |   5 +-
 .../hudi/common/table/HoodieTableConfig.java       |   2 +-
 .../hudi/common/table/TableSchemaResolver.java     |  17 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   2 +-
 .../hudi/common/table/TestTableSchemaResolver.java |   3 +-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  29 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  36 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 208 +++++----
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  20 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   4 +-
 .../org/apache/hudi/IncrementalRelation.scala      |   5 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  12 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  13 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   3 +
 ...eFormat.scala => HoodieParquetFileFormat.scala} |  30 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  28 +-
 .../hudi/functional/TestCOWDataSourceStorage.scala |   1 -
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   6 +-
 .../parquet/Spark24HoodieParquetFileFormat.scala   | 229 ++++++++++
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  22 +-
 .../parquet/Spark312HoodieParquetFileFormat.scala  | 507 +++++++++++----------
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  13 +-
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 449 ++++++++++--------
 28 files changed, 1166 insertions(+), 696 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index fe30f61b92..a3b9c210b9 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -18,12 +18,30 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.types.StructType
 
 trait HoodieCatalystExpressionUtils {
 
+  /**
+   * Generates instance of [[UnsafeProjection]] projecting row of one [[StructType]] into another [[StructType]]
+   *
+   * NOTE: No safety checks are executed to validate that this projection is actually feasible,
+   *       it's up to the caller to make sure that such projection is possible.
+   *
+   * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
+   *       B is a subset of A
+   */
+  def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    val attrs = from.toAttributes
+    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+    val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+    GenerateUnsafeProjection.generate(targetExprs, attrs)
+  }
+
   /**
    * Parses and resolves expression against the attributes of the given table schema.
    *
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index d8ed173547..a97743e62f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -177,7 +177,7 @@ trait SparkAdapter extends Serializable {
   def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan]
 
   /**
-    * Create hoodie parquet file format.
+    * Create instance of [[ParquetFileFormat]]
     */
-  def createHoodieParquetFileFormat(): Option[ParquetFileFormat]
+  def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
new file mode 100644
index 0000000000..dd14dca671
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {}
+
+  /**
+   * Appends provided new fields at the end of the given schema
+   *
+   * NOTE: No deduplication is made, this method simply appends fields at the end of the list
+   *       of the source schema as is
+   */
+  public static Schema appendFieldsToSchema(Schema schema, List<Schema.Field> newFields) {
+    List<Schema.Field> fields = schema.getFields().stream()
+        .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
+        .collect(Collectors.toList());
+    fields.addAll(newFields);
+
+    Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+  /**
+   * Passed in {@code Union} schema and will try to resolve the field with the {@code fieldSchemaFullName}
+   * w/in the union returning its corresponding schema
+   *
+   * @param schema target schema to be inspected
+   * @param fieldSchemaFullName target field-name to be looked up w/in the union
+   * @return schema of the field w/in the union identified by the {@code fieldSchemaFullName}
+   */
+  public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    Schema nonNullType =
+        innerTypes.stream()
+            .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
+            .findFirst()
+            .orElse(null);
+
+    if (nonNullType == null) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
+
+  /**
+   * Resolves typical Avro's nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
+   * decomposing union and returning the target non-null type
+   */
+  public static Schema resolveNullableSchema(Schema schema) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    Schema nonNullType =
+        innerTypes.stream()
+            .filter(it -> it.getType() != Schema.Type.NULL)
+            .findFirst()
+            .orElse(null);
+
+    if (innerTypes.size() != 2 || nonNullType == null) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
+
+  /**
+   * Creates schema following Avro's typical nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
+   * wrapping around provided target non-null type
+   */
+  public static Schema createNullableSchema(Schema.Type avroType) {
+    checkState(avroType != Schema.Type.NULL);
+    return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType));
+  }
+
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9367e23dc6..bf540a302e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.avro;
 
 import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.Conversions;
 import org.apache.avro.Conversions.DecimalConversion;
 import org.apache.avro.JsonProperties;
@@ -27,6 +26,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.LogicalTypes.Decimal;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -64,19 +64,19 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.HashMap;
 import java.util.TimeZone;
-import java.util.Iterator;
-
 import java.util.stream.Collectors;
 
 import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
 
 /**
  * Helper class to do common stuff across Avro.
@@ -97,8 +97,7 @@ public class HoodieAvroUtils {
   private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
 
   // All metadata fields are optional strings.
-  public static final Schema METADATA_FIELD_SCHEMA =
-      Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
+  public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING);
 
   public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
 
@@ -327,31 +326,6 @@ public class HoodieAvroUtils {
     return record;
   }
 
-  /**
-   * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query
-   * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
-   * determine that.
-   *
-   * @param schema        Passed in schema
-   * @param newFieldNames Null Field names to be added
-   */
-  public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
-    List<Field> newFields = new ArrayList<>();
-    for (String newField : newFieldNames) {
-      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE));
-    }
-    return createNewSchemaWithExtraFields(schema, newFields);
-  }
-
-  public static Schema createNewSchemaWithExtraFields(Schema schema, List<Field> newFields) {
-    List<Field> fields = schema.getFields().stream()
-        .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
-    fields.addAll(newFields);
-    Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
-    newSchema.setFields(fields);
-    return newSchema;
-  }
-
   /**
    * Adds the Hoodie commit metadata into the provided Generic Record.
    */
@@ -736,46 +710,6 @@ public class HoodieAvroUtils {
     return getRecordColumnValues(record, columns, schema.get(), consistentLogicalTimestampEnabled);
   }
 
-  private static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema;
-    }
-
-    List<Schema> innerTypes = schema.getTypes();
-    Schema nonNullType =
-        innerTypes.stream()
-            .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
-            .findFirst()
-            .orElse(null);
-
-    if (nonNullType == null) {
-      throw new AvroRuntimeException(
-          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
-    }
-
-    return nonNullType;
-  }
-
-  public static Schema resolveNullableSchema(Schema schema) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema;
-    }
-
-    List<Schema> innerTypes = schema.getTypes();
-    Schema nonNullType =
-        innerTypes.stream()
-          .filter(it -> it.getType() != Schema.Type.NULL)
-          .findFirst()
-          .orElse(null);
-
-    if (innerTypes.size() != 2 || nonNullType == null) {
-      throw new AvroRuntimeException(
-          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
-    }
-
-    return nonNullType;
-  }
-
   /**
    * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
    * support deep rewrite for nested record.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 93e9ea5d34..89bad1c33f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -213,7 +213,7 @@ public class HoodiePartitionMetadata {
       format = Option.empty();
       return true;
     } catch (Throwable t) {
-      LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t);
+      LOG.debug("Unable to read partition meta properties file for partition " + partitionPath);
       return false;
     }
   }
@@ -229,8 +229,7 @@ public class HoodiePartitionMetadata {
         format = Option.of(reader.getFormat());
         return true;
       } catch (Throwable t) {
-        // any error, log, check the next base format
-        LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t);
+        LOG.debug("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath);
       }
     }
     return false;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 254044bd28..bbc508bd5f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
     return getString(URL_ENCODE_PARTITIONING);
   }
 
-  public Boolean isDropPartitionColumns() {
+  public Boolean shouldDropPartitionColumns() {
     return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 262157a8ae..f178a23eee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.IndexedRecord;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -47,15 +45,13 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
 import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.io.storage.HoodieHFileReader;
-import org.apache.hudi.io.storage.HoodieOrcReader;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
-
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.hudi.io.storage.HoodieOrcReader;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -67,6 +63,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
 /**
  * Helper class to read schema from data files and log files and to convert it between different formats.
  *
@@ -189,7 +188,7 @@ public class TableSchemaResolver {
     }
 
     Option<String[]> partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields();
-    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
       schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema);
     }
     return schema;
@@ -222,9 +221,9 @@ public class TableSchemaResolver {
         List<Field> newFields = new ArrayList<>();
         for (String partitionField: partitionFields) {
           newFields.add(new Schema.Field(
-              partitionField, Schema.create(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
+              partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
         }
-        schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema, newFields);
+        schema = appendFieldsToSchema(schema, newFields);
       }
     }
     return schema;
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3904ff6f83..c0e97f3309 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -89,10 +89,10 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
 import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
-import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 59a24a79f0..e0e57e812b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table;
 
 import org.apache.avro.Schema;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 
@@ -57,7 +58,7 @@ public class TestTableSchemaResolver {
     assertNotEquals(originSchema, s4);
     assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition")));
     Schema.Field f = s4.getField("user_partition");
-    assertEquals(f.schema().getType().getName(), "string");
+    assertEquals(f.schema(), AvroSchemaUtils.createNullableSchema(Schema.Type.STRING));
 
     // case5: user_partition is in originSchema, but partition_path is in originSchema
     String[] pts4 = {"user_partition", "partition_path"};
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 0aa74ef154..0e4f9c304c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,13 +18,7 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
@@ -32,8 +26,8 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -46,6 +40,12 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -60,6 +60,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
 
@@ -287,6 +290,14 @@ public class HoodieRealtimeRecordReaderUtils {
     List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
         .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
 
-    return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
+    return appendNullSchemaFields(schema, fieldsToAdd);
+  }
+
+  private static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
+    List<Schema.Field> newFields = new ArrayList<>();
+    for (String newField : newFieldNames) {
+      newFields.add(new Schema.Field(newField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
+    }
+    return appendFieldsToSchema(schema, newFields);
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cc8fb0492a..556b0feef1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
 
 
 org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index adf94fffde..5414a228c7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,14 +20,13 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.common.model.HoodieFileFormat
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -56,6 +55,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   override type FileSplit = HoodieBaseFileSplit
 
   override lazy val mandatoryColumns: Seq[String] =
+    // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
     Seq(recordKeyField)
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
@@ -65,14 +65,14 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieUnsafeRDD = {
 
     val baseFileReader = createBaseFileReader(
       spark = sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = optParams,
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
+
     val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET =>
+        (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
     }
 
     if (globPaths.isEmpty) {
+      // NOTE: There are currently 2 ways partition values could be fetched:
+      //          - Source columns (producing the values used for physical partitioning) will be read
+      //          from the data file
+      //          - Values parsed from the actual partition pat would be appended to the final dataset
+      //
+      //        In the former case, we don't need to provide the partition-schema to the relation,
+      //        therefore we simply stub it w/ empty schema and use full table-schema as the one being
+      //        read from the data file.
+      //
+      //        In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
+      //        being a table-schema with all partition columns stripped out
+      val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
+        (fileIndex.partitionSchema, fileIndex.dataSchema)
+      } else {
+        (StructType(Nil), tableStructSchema)
+      }
+
       HadoopFsRelation(
         location = fileIndex,
-        partitionSchema = fileIndex.partitionSchema,
-        dataSchema = fileIndex.dataSchema,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
         bucketSpec = None,
         fileFormat = tableFileFormat,
         optParams)(sparkSession)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 53667f3b88..2fd1da5950 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.getPartitionPath
+import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
 import org.apache.hudi.common.fs.FSUtils
@@ -36,12 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.io.storage.HoodieHFileReader
-import org.apache.spark.TaskContext
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -50,11 +51,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
-import java.io.Closeable
 import java.net.URI
+import java.util.Locale
 import scala.collection.JavaConverters._
-import scala.util.Try
 import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
 
 trait HoodieFileSplit {}
 
@@ -78,7 +79,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   extends BaseRelation
     with FileRelation
     with PrunedFilteredScan
-    with SparkAdapterSupport
     with Logging {
 
   type FileSplit <: HoodieFileSplit
@@ -125,14 +125,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    val avroSchema = Try(schemaUtil.getTableAvroSchema).getOrElse(
-      // If there is no commit in the table, we can't get the schema
-      // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
-      userSchema match {
-        case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s, nullable = false, "record")
-        case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
-      }
-    )
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        logWarning("Failed to fetch schema from the table", e)
+        // If there is no commit in the table, we can't get the schema
+        // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
+        userSchema match {
+          case Some(s) => convertToAvroSchema(s)
+          case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
+        }
+    }
     // try to find internalSchema
     val internalSchemaFromMeta = try {
       schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
@@ -146,11 +149,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
 
-  /**
-   * if true, need to deal with schema for creating file reader.
-   */
-  protected val dropPartitionColumnsWhenWrite: Boolean =
-    metaClient.getTableConfig.isDropPartitionColumns && partitionColumns.nonEmpty
+  protected val shouldOmitPartitionColumns: Boolean =
+    metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
 
   /**
    * NOTE: PLEASE READ THIS CAREFULLY
@@ -205,14 +205,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    // NOTE: In case list of requested columns doesn't contain the Primary Key one, we
+    // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
+    //
+    //       In case list of requested columns doesn't contain the Primary Key one, we
     //       have to add it explicitly so that
     //          - Merging could be performed correctly
     //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
-    //          Spark still fetches all the rows to execute the query correctly
+    //            Spark still fetches all the rows to execute the query correctly
     //
-    //       It's okay to return columns that have not been requested by the caller, as those nevertheless will be
-    //       filtered out upstream
+    //       *Appending* additional columns to the ones requested by the caller is not a problem, as those
+    //       will be "projected out" by the caller's projection;
+    //
+    // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
+    //       PROJECTION
     val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
 
     val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
-      // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in
-      // data files.
-      StructType(partitionColumns.map(StructField(_, StringType)))
-    } else {
-      StructType(Nil)
-    }
 
-    val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema)
-    val dataSchema = if (dropPartitionColumnsWhenWrite) {
-      val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        dataStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString()
-      )
-    } else {
-      tableSchema
-    }
-    val requiredSchema = if (dropPartitionColumnsWhenWrite) {
-      val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        requiredStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString()
-      )
+    val tableAvroSchemaStr =
+      if (internalSchema.isEmptySchema) tableAvroSchema.toString
+      else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
+
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+
+    // Since schema requested by the caller might contain partition columns, we might need to
+    // prune it, removing all partition columns from it in case these columns are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able to embed
+    //       values of partition columns (hereafter referred to as partition values) encoded into
+    //       the partition path, and omitted from the data file, back into fetched rows;
+    //       Note that, by default, partition columns are not omitted therefore specifying
+    //       partition schema for reader is not required
+    val (partitionSchema, dataSchema, prunedRequiredSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    if (fileSplits.isEmpty) {
+      sparkSession.sparkContext.emptyRDD
     } else {
-      HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters)
+
+      // NOTE: In case when partition columns have been pruned from the required schema, we have to project
+      //       the rows from the pruned schema back into the one expected by the caller
+      val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
+        rdd.mapPartitions { it =>
+          val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
+          val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
+          it.map(unsafeProjection)
+        }
+      } else {
+        rdd
+      }
+
+      // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
+      // Please check [[needConversion]] scala-doc for more details
+      projectedRDD.asInstanceOf[RDD[Row]]
     }
-    // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
-    // Please check [[needConversion]] scala-doc for more details
-    if (fileSplits.nonEmpty)
-      composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
-    else
-      sparkSession.sparkContext.emptyRDD
   }
 
-
-
   /**
    * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
    *
    * @param fileSplits      file splits to be handled by the RDD
    * @param partitionSchema target table's partition schema
-   * @param tableSchema     target table's schema
+   * @param dataSchema      target table's data files' schema
    * @param requiredSchema  projected schema required by the reader
    * @param filters         data filters to be applied
    * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
    */
   protected def composeRDD(fileSplits: Seq[FileSplit],
                            partitionSchema: StructType,
-                           tableSchema: HoodieTableSchema,
+                           dataSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
                            filters: Array[Filter]): HoodieUnsafeRDD
 
@@ -325,16 +336,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
-    if (dropPartitionColumnsWhenWrite) {
-      if (requestedColumns.isEmpty) {
-        mandatoryColumns.toArray
-      } else {
-        requestedColumns
-      }
-    } else {
-      val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
-      requestedColumns ++ missing
-    }
+    val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
+    requestedColumns ++ missing
   }
 
   protected def getTableState: HoodieTableState = {
@@ -364,7 +367,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
     try {
       val tableConfig = metaClient.getTableConfig
-      if (dropPartitionColumnsWhenWrite) {
+      if (shouldOmitPartitionColumns) {
         val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
         val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
@@ -388,40 +391,47 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
         InternalRow.empty
     }
   }
-}
 
-object HoodieBaseRelation {
-
-  def getPartitionPath(fileStatus: FileStatus): Path =
-    fileStatus.getPath.getParent
+  protected def getColName(f: StructField): String = {
+    if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      f.name
+    } else {
+      f.name.toLowerCase(Locale.ROOT)
+    }
+  }
 
   /**
    * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
    * over [[InternalRow]]
    */
-  def createBaseFileReader(spark: SparkSession,
-                           partitionSchema: StructType,
-                           tableSchema: HoodieTableSchema,
-                           requiredSchema: HoodieTableSchema,
-                           filters: Seq[Filter],
-                           options: Map[String, String],
-                           hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+  protected def createBaseFileReader(spark: SparkSession,
+                                     partitionSchema: StructType,
+                                     dataSchema: HoodieTableSchema,
+                                     requiredSchema: HoodieTableSchema,
+                                     filters: Seq[Filter],
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
     val hfileReader = createHFileReader(
       spark = spark,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = options,
       hadoopConf = hadoopConf
     )
+
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
     val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
       sparkSession = spark,
-      dataSchema = tableSchema.structTypeSchema,
+      dataSchema = dataSchema.structTypeSchema,
       partitionSchema = partitionSchema,
       requiredSchema = requiredSchema.structTypeSchema,
       filters = filters,
       options = options,
-      hadoopConf = hadoopConf
+      hadoopConf = hadoopConf,
+      appendPartitionValues = shouldAppendPartitionColumns
     )
 
     partitionedFile => {
@@ -436,8 +446,38 @@ object HoodieBaseRelation {
     }
   }
 
+  private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+                                       requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
+    if (shouldOmitPartitionColumns) {
+      val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
+      val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
+      val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
+
+      (partitionSchema,
+        HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString),
+        HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString))
+    } else {
+      (StructType(Nil), tableSchema, requiredSchema)
+    }
+  }
+
+  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
+    StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
+}
+
+object HoodieBaseRelation extends SparkAdapterSupport {
+
+  private def generateUnsafeProjection(from: StructType, to: StructType) =
+    sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to)
+
+  def convertToAvroSchema(structSchema: StructType): Schema =
+    sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
+
+  def getPartitionPath(fileStatus: FileStatus): Path =
+    fileStatus.getPath.getParent
+
   private def createHFileReader(spark: SparkSession,
-                                tableSchema: HoodieTableSchema,
+                                dataSchema: HoodieTableSchema,
                                 requiredSchema: HoodieTableSchema,
                                 filters: Seq[Filter],
                                 options: Map[String, String],
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 02264bc4a6..1fc9e70a5a 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -21,6 +21,7 @@ package org.apache.hudi
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.spark.sql.SparkSession
@@ -38,8 +39,8 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
 
 
   /**
-   * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
-   * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary.
+   * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]],
+   * when Parquet's Vectorized Reader is used
    */
   def buildHoodieParquetReader(sparkSession: SparkSession,
                                dataSchema: StructType,
@@ -47,9 +48,11 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
                                requiredSchema: StructType,
                                filters: Seq[Filter],
                                options: Map[String, String],
-                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+                               hadoopConf: Configuration,
+                               appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
 
-    val readParquetFile: PartitionedFile => Iterator[Any] = sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(
+    val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+    val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
       dataSchema = dataSchema,
       partitionSchema = partitionSchema,
@@ -91,9 +94,12 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
     * @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one.
     */
   def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
-    conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
-    conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
-    conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
+    val querySchemaString = SerDeHelper.toJson(internalSchema)
+    if (!isNullOrEmpty(querySchemaString)) {
+      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
+      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
+      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
+    }
     conf
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c86b1615ba..38062aa802 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -88,7 +88,7 @@ object HoodieSparkSqlWriter {
 
     val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
     val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
-    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
+    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
       originKeyGeneratorClassName, parameters)
     //validate datasource and tableconfig keygen are the same
     validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
@@ -758,7 +758,7 @@ object HoodieSparkSqlWriter {
     (params, HoodieWriterUtils.convertMapToHoodieConfig(params))
   }
 
-  private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String,
+  private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator: String,
       params: Map[String, String]): Map[String, String] = {
     if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
         keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 039dafb596..d9d5812adb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -20,8 +20,8 @@ package org.apache.hudi
 import org.apache.avro.Schema
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, HoodieRecord, HoodieReplaceCommitMetadata}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
-import java.util.stream.Collectors
 
+import java.util.stream.Collectors
 import org.apache.hadoop.fs.{GlobPattern, Path}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -183,7 +184,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
       val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
-        case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet"
+        case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
         case HoodieFileFormat.ORC => "orc"
       }
       sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 46e395fc2b..6aa7007851 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -19,9 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -61,14 +59,14 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieMergeOnReadRDD = {
     val fullSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
-      requiredSchema = tableSchema,
+      dataSchema = dataSchema,
+      requiredSchema = dataSchema,
       // This file-reader is used to read base file records, subsequently merging them with the records
       // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
       // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -86,7 +84,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     val requiredSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters ++ incrementalSpanRecordFilters,
       options = optParams,
@@ -99,7 +97,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
     //                 filtered, since file-reader might not be capable to perform filtering
     new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
-      tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
+      dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
   }
 
   override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index d85788e25b..a88eb63036 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,17 +20,14 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.sources.Filter
@@ -63,14 +60,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieMergeOnReadRDD = {
     val fullSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
-      requiredSchema = tableSchema,
+      dataSchema = dataSchema,
+      requiredSchema = dataSchema,
       // This file-reader is used to read base file records, subsequently merging them with the records
       // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
       // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -85,7 +82,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     val requiredSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = optParams,
@@ -96,7 +93,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
     val tableState = getTableState
     new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
-      tableSchema, requiredSchema, tableState, mergeType, fileSplits)
+      dataSchema, requiredSchema, tableState, mergeType, fileSplits)
   }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 1305323bd1..cd1c1fb4af 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -120,6 +120,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
   }
 
+  /**
+   * @VisibleForTesting
+   */
   def partitionSchema: StructType = {
     if (queryAsNonePartitionedTable) {
       // If we read it as Non-Partitioned table, we should not
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
similarity index 58%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
index 150178ea69..dbb62d089e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
@@ -23,26 +23,32 @@ import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 
-class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
-  override def shortName(): String = "HoodieParquet"
+class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
 
-  override def toString: String = "HoodieParquet"
+  override def shortName(): String = FILE_FORMAT_ID
 
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+  override def toString: String = "Hoodie-Parquet"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
     sparkAdapter
-      .createHoodieParquetFileFormat().get
+      .createHoodieParquetFileFormat(appendPartitionValues = false).get
       .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
   }
 }
 
+object HoodieParquetFileFormat {
+
+  val FILE_FORMAT_ID = "hoodie-parquet"
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index b232ef010f..28a6dcdcd6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -747,7 +747,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(resultSchema, schema1)
   }
 
-  @ParameterizedTest @ValueSource(booleans = Array(true, false))
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
   def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) {
     val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -897,9 +898,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
       readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
   }
 
-  @Disabled("HUDI-3204")
-  @Test
-  def testHoodieBaseFileOnlyViewRelation(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
     val _spark = spark
     import _spark.implicits._
 
@@ -925,18 +926,27 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .mode(org.apache.spark.sql.SaveMode.Append)
       .save(basePath)
 
-    val res = spark.read.format("hudi").load(basePath)
+    // NOTE: We're testing here that both paths are appropriately handling
+    //       partition values, regardless of whether we're reading the table
+    //       t/h a globbed path or not
+    val path = if (useGlobbing) {
+      s"$basePath/*/*/*/*"
+    } else {
+      basePath
+    }
+
+    val res = spark.read.format("hudi").load(path)
 
     assert(res.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
     assertEquals(
-      res.select("data_date").map(_.get(0).toString).collect().sorted,
-      Array("2018-09-23", "2018-09-24")
+      res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
+      Seq("2018-09-23", "2018-09-24")
     )
     assertEquals(
-      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
-      Array("2018/09/23", "2018/09/24")
+      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
+      Seq("2018/09/23", "2018/09/24")
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index bd7edd4db5..48bb46f81b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -57,7 +57,6 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
 
-  @Disabled("HUDI-3896")
   @ParameterizedTest
   @CsvSource(Array(
     "true,org.apache.hudi.keygen.SimpleKeyGenerator",
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index e4b3c4010a..0e74c997d7 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
@@ -165,7 +165,7 @@ class Spark2Adapter extends SparkAdapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    Some(new ParquetFileFormat)
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
new file mode 100644
index 0000000000..6fb5c50c03
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ * </ol>
+ */
+class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val fileSplit =
+        new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+      val filePath = fileSplit.getPath
+
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          fileSplit.getStart,
+          fileSplit.getStart + fileSplit.getLength,
+          fileSplit.getLength,
+          fileSplit.getLocations,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
+          pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+        filters
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter(parquetSchema, _))
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+        } else {
+          None
+        }
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader = new VectorizedParquetRecordReader(
+          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (shouldAppendPartitionValues) {
+          vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        } else {
+          vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+        }
+
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns UnsafeRow
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
+        } else {
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+        }
+        val iter = new RecordReaderIterator(reader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        reader.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+        val joinedRow = new JoinedRow()
+        val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+        // This is a horrible erasure hack...  if we type the iterator above, then it actually check
+        // the type in next() and we get a class cast exception.  If we make that function return
+        // Object, then we can defer the cast until later!
+        //
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          iter.asInstanceOf[Iterator[InternalRow]]
+            .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+        }
+
+      }
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 13dba82488..cd5cd9c82f 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,14 +19,13 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
-import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer, HoodieSparkAvroSchemaConverters}
-import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils}
 import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat}
+import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
 
 /**
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
index 83b3162bbc..769373866f 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
@@ -17,279 +17,312 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split =
-          new org.apache.parquet.hadoop.ParquetInputSplit(
-            filePath,
-            file.start,
-            file.start + file.length,
-            file.length,
-            Array.empty,
-            null)
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
         } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive)
         }
+        filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
 
-        lazy val footerFileMetaData =
-          ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive,
-              datetimeRebaseMode)
-          } else {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive)
-          }
-          filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader =
+          if (shouldUseInternalSchema) {
+            new Spark312HoodieVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity,
+              typeChangeInfos)
           } else {
-            None
+            new VectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
-        val int96RebaseMode = DataSourceUtils.int96RebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
-        //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseMode.toString,
-            int96RebaseMode.toString,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          vectorizedReader.initialize(split, hadoopAttemptContext)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion listener before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (shouldAppendPartitionValues) {
           logDebug(s"Appending $partitionSchema ${file.partitionValues}")
           vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-          if (returningBatch) {
-            vectorizedReader.enableReturningBatches()
-          }
+        } else {
+          vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+        }
+
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
 
-          // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-          iter.asInstanceOf[Iterator[InternalRow]]
+        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns InternalRow
+        val readSupport = new ParquetReadSupport(
+          convertTz,
+          enableVectorizedReader = false,
+          datetimeRebaseMode,
+          int96RebaseMode)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
         } else {
-          logDebug(s"Falling back to parquet-mr")
-          // ParquetRecordReader returns InternalRow
-          val readSupport = new ParquetReadSupport(
-            convertTz,
-            enableVectorizedReader = false,
-            datetimeRebaseMode,
-            int96RebaseMode)
-          val reader = if (pushed.isDefined && enableRecordFilter) {
-            val parquetFilter = FilterCompat.get(pushed.get, null)
-            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-          } else {
-            new ParquetRecordReader[InternalRow](readSupport)
-          }
-          val iter = new RecordReaderIterator[InternalRow](reader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          reader.initialize(split, hadoopAttemptContext)
-
-          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
-          val unsafeProjection = if (typeChangeInfos.isEmpty) {
-            GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-          } else {
-            // find type changed.
-            val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
-              if (typeChangeInfos.containsKey(i)) {
-                StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
-              } else f
-            }).toAttributes ++ partitionSchema.toAttributes
-            val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
-              if (typeChangeInfos.containsKey(i)) {
-                Cast(attr, typeChangeInfos.get(i).getLeft)
-              } else attr
-            }
-            GenerateUnsafeProjection.generate(castSchema, newFullSchema)
-          }
+          new ParquetRecordReader[InternalRow](readSupport)
+        }
+        val iter = new RecordReaderIterator[InternalRow](reader)
+        // SPARK-23457 Register a task completion listener before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        reader.initialize(split, hadoopAttemptContext)
 
-          if (partitionSchema.length == 0) {
-            // There is no partition columns
-            iter.map(unsafeProjection)
-          } else {
-            val joinedRow = new JoinedRow()
-            iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+        val unsafeProjection = if (typeChangeInfos.isEmpty) {
+          GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+        } else {
+          // find type changed.
+          val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+            if (typeChangeInfos.containsKey(i)) {
+              StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
+            } else f
+          }).toAttributes ++ partitionSchema.toAttributes
+          val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+            if (typeChangeInfos.containsKey(i)) {
+              Cast(attr, typeChangeInfos.get(i).getLeft)
+            } else attr
           }
+          GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+        }
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
         }
       }
     }
@@ -300,6 +333,16 @@ object Spark312HoodieParquetFileFormat {
 
   val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
+    val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+    if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+      val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
+      SerDeHelper.toJson(prunedSchema)
+    } else {
+      internalSchemaStr
+    }
+  }
+
   private def createParquetFilters(arg: Any*): ParquetFilters = {
     val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
     val ctor = clazz.getConstructors.head
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index bad392b4f9..15624c7411 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat}
 import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession}
@@ -80,14 +80,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.2")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 28db473965..f2a0a21df8 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
@@ -27,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -34,226 +33,266 @@ import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+    val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
+    val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+    val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-      val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
-      val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
-      val int96RebaseModeInread = parquetOptions.int96RebaseModeInRead
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
-        } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
-        }
 
-        lazy val footerFileMetaData =
-          ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
-          footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = new ParquetFilters(
-            parquetSchema,
-            pushDownDate,
-            pushDownTimestamp,
-            pushDownDecimal,
-            pushDownStringStartWith,
-            pushDownInFilterThreshold,
-            isCaseSensitive,
-            datetimeRebaseSpec)
-          filters.map(Spark32HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      lazy val footerFileMetaData =
+        ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+        footerFileMetaData.getKeyValueMetaData.get,
+        datetimeRebaseModeInRead)
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(
+          parquetSchema,
+          pushDownDate,
+          pushDownTimestamp,
+          pushDownDecimal,
+          pushDownStringStartWith,
+          pushDownInFilterThreshold,
+          isCaseSensitive,
+          datetimeRebaseSpec)
+        filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+        footerFileMetaData.getKeyValueMetaData.get,
+        int96RebaseModeInRead)
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader =
+          if (shouldUseInternalSchema) {
+            new Spark32HoodieVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseSpec.mode.toString,
+              datetimeRebaseSpec.timeZone,
+              int96RebaseSpec.mode.toString,
+              int96RebaseSpec.timeZone,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity,
+              typeChangeInfos)
           } else {
-            None
+            new VectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseSpec.mode.toString,
+              datetimeRebaseSpec.timeZone,
+              int96RebaseSpec.mode.toString,
+              int96RebaseSpec.timeZone,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
-        val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
-          footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInread)
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
+        // SPARK-37089: We cannot register a task completion listener to close this iterator here
+        // because downstream exec nodes have already registered their listeners. Since listeners
+        // are executed in reverse order of registration, a listener registered here would close the
+        // iterator while downstream exec nodes are still running. When off-heap column vectors are
+        // enabled, this can cause a use-after-free bug leading to a segfault.
         //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+        // Instead, we use FileScanRDD's task completion listener to close this iterator.
+        val iter = new RecordReaderIterator(vectorizedReader)
+        try {
+          vectorizedReader.initialize(split, hadoopAttemptContext)
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark32HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseSpec.mode.toString,
-            datetimeRebaseSpec.timeZone,
-            int96RebaseSpec.mode.toString,
-            int96RebaseSpec.timeZone,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          //          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          try {
-            vectorizedReader.initialize(split, hadoopAttemptContext)
+          // NOTE: We're making appending of the partitioned values to the rows read from the
+          //       data file configurable
+          if (shouldAppendPartitionValues) {
             logDebug(s"Appending $partitionSchema ${file.partitionValues}")
             vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-            if (returningBatch) {
-              vectorizedReader.enableReturningBatches()
-            }
+          } else {
+            vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+          }
 
-            // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-            iter.asInstanceOf[Iterator[InternalRow]]
-          } catch {
-            case e: Throwable =>
-              // SPARK-23457: In case there is an exception in initialization, close the iterator to
-              // avoid leaking resources.
-              iter.close()
-              throw e
+          if (returningBatch) {
+            vectorizedReader.enableReturningBatches()
           }
+
+          // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
+        }
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns InternalRow
+        val readSupport = new ParquetReadSupport(
+          convertTz,
+          enableVectorizedReader = false,
+          datetimeRebaseSpec,
+          int96RebaseSpec)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
         } else {
-          logDebug(s"Falling back to parquet-mr")
-          // ParquetRecordReader returns InternalRow
-          val readSupport = new ParquetReadSupport(
-            convertTz,
-            enableVectorizedReader = false,
-            datetimeRebaseSpec,
-            int96RebaseSpec)
-          val reader = if (pushed.isDefined && enableRecordFilter) {
-            val parquetFilter = FilterCompat.get(pushed.get, null)
-            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-          } else {
-            new ParquetRecordReader[InternalRow](readSupport)
-          }
-          val iter = new RecordReaderIterator[InternalRow](reader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+          new ParquetRecordReader[InternalRow](readSupport)
+        }
+        val iter = new RecordReaderIterator[InternalRow](reader)
+        try {
           reader.initialize(split, hadoopAttemptContext)
 
           val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
@@ -274,13 +313,21 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
             GenerateUnsafeProjection.generate(castSchema, newFullSchema)
           }
 
-          if (partitionSchema.length == 0) {
+          // NOTE: We're making appending of the partitioned values to the rows read from the
+          //       data file configurable
+          if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
             // There is no partition columns
             iter.map(unsafeProjection)
           } else {
             val joinedRow = new JoinedRow()
             iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
           }
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
         }
       }
     }
@@ -289,6 +336,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
 
 object Spark32HoodieParquetFileFormat {
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
+    val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+    if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+      val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
+      SerDeHelper.toJson(prunedSchema)
+    } else {
+      internalSchemaStr
+    }
+  }
+
   private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
     if (fileSchema == null || querySchema == null) {
       oldFilter