You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/20 17:12:22 UTC

[hudi] branch release-0.11.0 updated (c0cadbca65 -> 2ec27cc436)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a change to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from c0cadbca65 [HUDI-3894] Fix gcp bundle to include HBase dependencies and shading (#5349)
     new 40c80f6627 [HUDI-3902] Fallback to `HadoopFsRelation` in cases non-involving Schema Evolution (#5352)
     new a462584e04 [HUDI-3905] Add S3 related setup in Kafka Connect quick start (#5356)
     new 4c40d1645c [HUDI-3920] Fix partition path construction in metadata table validator (#5365)
     new 89a27a1ccf [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)
     new 5c57920fe7 [HUDI-3904] Claim RFC number for Improve timeline server (#5354)
     new fcaeaea62f [HUDI-3912] Fix lose data when rollback in flink async compact (#5357)
     new 2ec27cc436 [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  20 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 112 +++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  80 +---
 .../hudi/common/model/HoodiePartitionMetadata.java |   5 +-
 .../hudi/common/table/HoodieTableConfig.java       |   2 +-
 .../hudi/common/table/TableSchemaResolver.java     |  17 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   2 +-
 .../hudi/common/table/TestTableSchemaResolver.java |   3 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  31 ++
 .../hudi/sink/append/AppendWriteFunction.java      |   2 +-
 .../sink/common/AbstractStreamWriteFunction.java   |   9 +-
 .../hudi/sink/compact/CompactionCommitSink.java    |  17 +-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  29 +-
 hudi-kafka-connect/README.md                       |  21 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  78 +++-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  36 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 217 +++++----
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  20 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   4 +-
 .../org/apache/hudi/IncrementalRelation.scala      |   5 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  12 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  13 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   3 +
 ...eFormat.scala => HoodieParquetFileFormat.scala} |  30 +-
 .../scala/org/apache/hudi/SparkDatasetMixin.scala} |  30 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  37 +-
 .../hudi/functional/TestCOWDataSourceStorage.scala |   2 +-
 .../apache/hudi/functional/TestMORDataSource.scala |  20 +-
 .../functional/TestParquetColumnProjection.scala   |   9 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   6 +-
 .../parquet/Spark24HoodieParquetFileFormat.scala   | 229 ++++++++++
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  22 +-
 .../parquet/Spark312HoodieParquetFileFormat.scala  | 507 +++++++++++----------
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  13 +-
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 449 ++++++++++--------
 .../utilities/HoodieMetadataTableValidator.java    |   7 +-
 rfc/README.md                                      |   1 +
 39 files changed, 1360 insertions(+), 746 deletions(-)
 create mode 100644 hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
 rename hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/{SparkHoodieParquetFileFormat.scala => HoodieParquetFileFormat.scala} (58%)
 copy hudi-spark-datasource/hudi-spark/src/{main/scala/org/apache/spark/sql/hudi/command/UuidKeyGenerator.scala => test/scala/org/apache/hudi/SparkDatasetMixin.scala} (51%)
 create mode 100644 hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala


[hudi] 03/07: [HUDI-3920] Fix partition path construction in metadata table validator (#5365)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4c40d1645cbe6827095bfe9296eefc876dd5bd7d
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Apr 19 16:40:09 2022 -0700

    [HUDI-3920] Fix partition path construction in metadata table validator (#5365)
---
 .../org/apache/hudi/utilities/HoodieMetadataTableValidator.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 7962085ace..80cc56a4e5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -490,7 +490,8 @@ public class HoodieMetadataTableValidator implements Serializable {
 
     // ignore partitions created by uncommitted ingestion.
     allPartitionPathsFromFS = allPartitionPathsFromFS.stream().parallel().filter(part -> {
-      HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(basePath, part));
+      HoodiePartitionMetadata hoodiePartitionMetadata =
+          new HoodiePartitionMetadata(metaClient.getFs(), FSUtils.getPartitionPath(basePath, part));
 
       Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
       if (instantOption.isPresent()) {
@@ -983,7 +984,7 @@ public class HoodieMetadataTableValidator implements Serializable {
         return baseFileNameList.stream().flatMap(filename ->
                 new ParquetUtils().readRangeFromParquetMetadata(
                     metaClient.getHadoopConf(),
-                    new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
+                    new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), filename),
                     allColumnNameList).stream())
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
@@ -1024,7 +1025,7 @@ public class HoodieMetadataTableValidator implements Serializable {
     }
 
     private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, String filename) {
-      Path path = new Path(new Path(metaClient.getBasePath(), partitionPath), filename);
+      Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), filename);
       HoodieFileReader<IndexedRecord> fileReader;
       try {
         fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path);


[hudi] 02/07: [HUDI-3905] Add S3 related setup in Kafka Connect quick start (#5356)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a462584e04a2ba75d4a8b51aeeb203bb254d0f72
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Apr 19 15:08:28 2022 -0700

    [HUDI-3905] Add S3 related setup in Kafka Connect quick start (#5356)
---
 hudi-kafka-connect/README.md | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index a5784139bc..f80e2c9fe1 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -48,8 +48,8 @@ $CONFLUENT_DIR/bin/confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
 cp -r $CONFLUENT_DIR/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/* /usr/local/share/kafka/plugins/
 ```
 
-Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it 
-to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)
+Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it to the
+plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)
 
 ```bash
 cd $HUDI_DIR
@@ -58,8 +58,20 @@ mkdir -p /usr/local/share/kafka/plugins/lib
 cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
 ```
 
-Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads). 
-Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
+If the Hudi Sink Connector writes to a target Hudi table on [Amazon S3](https://aws.amazon.com/s3/), you need two
+additional jars, `hadoop-aws-2.10.1.jar` and `aws-java-sdk-bundle-1.11.271.jar`, in the `plugins/lib` folder. You may
+download them using the following commands. Note that, when you specify the target table path on S3, you need to use
+`s3a://` prefix.
+
+```bash
+cd /usr/local/share/kafka/plugins/lib
+wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
+wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.10.1/hadoop-aws-2.10.1.jar
+```
+
+Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads). Once
+downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
+
 ```bash
 export KAFKA_HOME=/path/to/kafka_install_dir
 cd $KAFKA_HOME
@@ -67,6 +79,7 @@ cd $KAFKA_HOME
 ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
 ./bin/kafka-server-start.sh ./config/server.properties
 ```
+
 Wait until the kafka cluster is up and running.
 
 ### 2 - Set up the schema registry


[hudi] 05/07: [HUDI-3904] Claim RFC number for Improve timeline server (#5354)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5c57920fe7a953623d4bbc3b5f50088f4f218290
Author: Zhaojing Yu <yu...@bytedance.com>
AuthorDate: Wed Apr 20 14:31:21 2022 +0800

    [HUDI-3904] Claim RFC number for Improve timeline server (#5354)
---
 rfc/README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/rfc/README.md b/rfc/README.md
index 0009a1b72b..e3f89714be 100644
--- a/rfc/README.md
+++ b/rfc/README.md
@@ -85,3 +85,4 @@ The list of all RFCs can be found here.
 | 47 | [Add Call Produce Command for Spark SQL](./rfc-47/rfc-47.md) | `UNDER REVIEW` | 
 | 48 | [LogCompaction for MOR tables](./rfc-48/rfc-48.md) | `UNDER REVIEW` | 
 | 49 | [Support sync with DataHub](./rfc-49/rfc-49.md)    | `ONGOING` |
+| 50 | [Improve Timeline Server](./rfc-50/rfc-50.md) | `UNDER REVIEW` | 


[hudi] 04/07: [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 89a27a1ccf4bdc32c60158aad3ca6e8a7068c206
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 20 12:48:24 2022 +0800

    [HUDI-3917] Flink write task hangs if last checkpoint has no data input (#5360)
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 31 ++++++++++++++++++++++
 .../hudi/sink/append/AppendWriteFunction.java      |  2 +-
 .../sink/common/AbstractStreamWriteFunction.java   |  9 ++++++-
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index b5ec08a583..023b1e6965 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -42,6 +43,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -429,6 +431,31 @@ public class StreamWriteOperatorCoordinator
     addEventToBuffer(event);
   }
 
+  /**
+   * The coordinator reuses the instant if there is no data for this round of checkpoint,
+   * sends the commit ack events to unblock the flushing.
+   */
+  private void sendCommitAckEvents(long checkpointId) {
+    CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
+        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
+        .toArray(CompletableFuture<?>[]::new);
+    CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
+      if (!sendToFinishedTasks(error)) {
+        throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
+      }
+    });
+  }
+
+  /**
+   * Decides whether the given exception is caused by sending events to FINISHED tasks.
+   *
+   * <p>Ugly impl: the exception may change in the future.
+   */
+  private static boolean sendToFinishedTasks(Throwable throwable) {
+    return throwable.getCause() instanceof TaskNotRunningException
+        || throwable.getCause().getMessage().contains("running");
+  }
+
   /**
    * Commits the instant.
    */
@@ -456,6 +483,10 @@ public class StreamWriteOperatorCoordinator
     if (writeResults.size() == 0) {
       // No data has written, reset the buffer and returns early
       reset();
+      // Send commit ack event to the write function to unblock the flushing
+      // If this checkpoint has no inputs while the next checkpoint has inputs,
+      // the 'isConfirming' flag should be switched with the ack event.
+      sendCommitAckEvents(checkpointId);
       return false;
     }
     doCommit(instant, writeResults);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index a72b885a22..7b40718b35 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -123,9 +123,9 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
       writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
       instant = this.writerHelper.getInstantTime();
     } else {
-      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
       writeStatus = Collections.emptyList();
       instant = instantToWrite(false);
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant);
     }
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 4e8712b661..98085fa74f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -247,7 +247,7 @@ public abstract class AbstractStreamWriteFunction<I>
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change and the checkpoint has buffering data
-      if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) {
+      if (instant == null || invalidInstant(instant, hasData)) {
         // sleep for a while
         timeWait.waitFor();
         // refresh the inflight instant
@@ -260,4 +260,11 @@ public abstract class AbstractStreamWriteFunction<I>
     }
     return instant;
   }
+
+  /**
+   * Returns whether the pending instant is invalid to write with.
+   */
+  private boolean invalidInstant(String instant, boolean hasData) {
+    return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
+  }
 }


[hudi] 07/07: [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2ec27cc436dcd70a61a628c3228a9c3096eb3361
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 20 04:30:27 2022 -0700

    [HUDI-3204] Fixing partition-values being derived from partition-path instead of source columns (#5364)
    
     - Scaffolded `Spark24HoodieParquetFileFormat` extending `ParquetFileFormat` and overriding the behavior of adding partition columns to every row
     - Amended `SparkAdapter`s `createHoodieParquetFileFormat` API to be able to configure whether to append partition values or not
     - Fallback to append partition values in cases when the source columns are not persisted in data-file
     - Fixing HoodieBaseRelation incorrectly handling mandatory columns
---
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  20 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 112 +++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  80 +---
 .../hudi/common/model/HoodiePartitionMetadata.java |   5 +-
 .../hudi/common/table/HoodieTableConfig.java       |   2 +-
 .../hudi/common/table/TableSchemaResolver.java     |  17 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   2 +-
 .../hudi/common/table/TestTableSchemaResolver.java |   3 +-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  29 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |  36 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 208 +++++----
 .../org/apache/hudi/HoodieDataSourceHelper.scala   |  20 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   4 +-
 .../org/apache/hudi/IncrementalRelation.scala      |   5 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  12 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  13 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   3 +
 ...eFormat.scala => HoodieParquetFileFormat.scala} |  30 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  28 +-
 .../hudi/functional/TestCOWDataSourceStorage.scala |   1 -
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   6 +-
 .../parquet/Spark24HoodieParquetFileFormat.scala   | 229 ++++++++++
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  22 +-
 .../parquet/Spark312HoodieParquetFileFormat.scala  | 507 +++++++++++----------
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  13 +-
 .../parquet/Spark32HoodieParquetFileFormat.scala   | 449 ++++++++++--------
 28 files changed, 1166 insertions(+), 696 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index fe30f61b92..a3b9c210b9 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -18,12 +18,30 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.types.StructType
 
 trait HoodieCatalystExpressionUtils {
 
+  /**
+   * Generates instance of [[UnsafeProjection]] projecting row of one [[StructType]] into another [[StructType]]
+   *
+   * NOTE: No safety checks are executed to validate that this projection is actually feasible,
+   *       it's up to the caller to make sure that such projection is possible.
+   *
+   * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
+   *       B is a subset of A
+   */
+  def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    val attrs = from.toAttributes
+    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+    val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+    GenerateUnsafeProjection.generate(targetExprs, attrs)
+  }
+
   /**
    * Parses and resolves expression against the attributes of the given table schema.
    *
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index d8ed173547..a97743e62f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -177,7 +177,7 @@ trait SparkAdapter extends Serializable {
   def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan]
 
   /**
-    * Create hoodie parquet file format.
+    * Create instance of [[ParquetFileFormat]]
     */
-  def createHoodieParquetFileFormat(): Option[ParquetFileFormat]
+  def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
new file mode 100644
index 0000000000..dd14dca671
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {}
+
+  /**
+   * Appends provided new fields at the end of the given schema
+   *
+   * NOTE: No deduplication is made, this method simply appends fields at the end of the list
+   *       of the source schema as is
+   */
+  public static Schema appendFieldsToSchema(Schema schema, List<Schema.Field> newFields) {
+    List<Schema.Field> fields = schema.getFields().stream()
+        .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
+        .collect(Collectors.toList());
+    fields.addAll(newFields);
+
+    Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+  /**
+   * Passed in {@code Union} schema and will try to resolve the field with the {@code fieldSchemaFullName}
+   * w/in the union returning its corresponding schema
+   *
+   * @param schema target schema to be inspected
+   * @param fieldSchemaFullName target field-name to be looked up w/in the union
+   * @return schema of the field w/in the union identified by the {@code fieldSchemaFullName}
+   */
+  public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    Schema nonNullType =
+        innerTypes.stream()
+            .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
+            .findFirst()
+            .orElse(null);
+
+    if (nonNullType == null) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
+
+  /**
+   * Resolves typical Avro's nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
+   * decomposing union and returning the target non-null type
+   */
+  public static Schema resolveNullableSchema(Schema schema) {
+    if (schema.getType() != Schema.Type.UNION) {
+      return schema;
+    }
+
+    List<Schema> innerTypes = schema.getTypes();
+    Schema nonNullType =
+        innerTypes.stream()
+            .filter(it -> it.getType() != Schema.Type.NULL)
+            .findFirst()
+            .orElse(null);
+
+    if (innerTypes.size() != 2 || nonNullType == null) {
+      throw new AvroRuntimeException(
+          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+    }
+
+    return nonNullType;
+  }
+
+  /**
+   * Creates schema following Avro's typical nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
+   * wrapping around provided target non-null type
+   */
+  public static Schema createNullableSchema(Schema.Type avroType) {
+    checkState(avroType != Schema.Type.NULL);
+    return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType));
+  }
+
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9367e23dc6..bf540a302e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.avro;
 
 import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.Conversions;
 import org.apache.avro.Conversions.DecimalConversion;
 import org.apache.avro.JsonProperties;
@@ -27,6 +26,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.LogicalTypes.Decimal;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -64,19 +64,19 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.HashMap;
 import java.util.TimeZone;
-import java.util.Iterator;
-
 import java.util.stream.Collectors;
 
 import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
 
 /**
  * Helper class to do common stuff across Avro.
@@ -97,8 +97,7 @@ public class HoodieAvroUtils {
   private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
 
   // All metadata fields are optional strings.
-  public static final Schema METADATA_FIELD_SCHEMA =
-      Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)));
+  public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING);
 
   public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
 
@@ -327,31 +326,6 @@ public class HoodieAvroUtils {
     return record;
   }
 
-  /**
-   * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query
-   * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
-   * determine that.
-   *
-   * @param schema        Passed in schema
-   * @param newFieldNames Null Field names to be added
-   */
-  public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
-    List<Field> newFields = new ArrayList<>();
-    for (String newField : newFieldNames) {
-      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE));
-    }
-    return createNewSchemaWithExtraFields(schema, newFields);
-  }
-
-  public static Schema createNewSchemaWithExtraFields(Schema schema, List<Field> newFields) {
-    List<Field> fields = schema.getFields().stream()
-        .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
-    fields.addAll(newFields);
-    Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
-    newSchema.setFields(fields);
-    return newSchema;
-  }
-
   /**
    * Adds the Hoodie commit metadata into the provided Generic Record.
    */
@@ -736,46 +710,6 @@ public class HoodieAvroUtils {
     return getRecordColumnValues(record, columns, schema.get(), consistentLogicalTimestampEnabled);
   }
 
-  private static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema;
-    }
-
-    List<Schema> innerTypes = schema.getTypes();
-    Schema nonNullType =
-        innerTypes.stream()
-            .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
-            .findFirst()
-            .orElse(null);
-
-    if (nonNullType == null) {
-      throw new AvroRuntimeException(
-          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
-    }
-
-    return nonNullType;
-  }
-
-  public static Schema resolveNullableSchema(Schema schema) {
-    if (schema.getType() != Schema.Type.UNION) {
-      return schema;
-    }
-
-    List<Schema> innerTypes = schema.getTypes();
-    Schema nonNullType =
-        innerTypes.stream()
-          .filter(it -> it.getType() != Schema.Type.NULL)
-          .findFirst()
-          .orElse(null);
-
-    if (innerTypes.size() != 2 || nonNullType == null) {
-      throw new AvroRuntimeException(
-          String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
-    }
-
-    return nonNullType;
-  }
-
   /**
    * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
    * support deep rewrite for nested record.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 93e9ea5d34..89bad1c33f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -213,7 +213,7 @@ public class HoodiePartitionMetadata {
       format = Option.empty();
       return true;
     } catch (Throwable t) {
-      LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t);
+      LOG.debug("Unable to read partition meta properties file for partition " + partitionPath);
       return false;
     }
   }
@@ -229,8 +229,7 @@ public class HoodiePartitionMetadata {
         format = Option.of(reader.getFormat());
         return true;
       } catch (Throwable t) {
-        // any error, log, check the next base format
-        LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t);
+        LOG.debug("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath);
       }
     }
     return false;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 254044bd28..bbc508bd5f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig {
     return getString(URL_ENCODE_PARTITIONING);
   }
 
-  public Boolean isDropPartitionColumns() {
+  public Boolean shouldDropPartitionColumns() {
     return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 262157a8ae..f178a23eee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.IndexedRecord;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -47,15 +45,13 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIncompatibleSchemaException;
 import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.io.storage.HoodieHFileReader;
-import org.apache.hudi.io.storage.HoodieOrcReader;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
-
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.hudi.io.storage.HoodieOrcReader;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -67,6 +63,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
 /**
  * Helper class to read schema from data files and log files and to convert it between different formats.
  *
@@ -189,7 +188,7 @@ public class TableSchemaResolver {
     }
 
     Option<String[]> partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields();
-    if (metaClient.getTableConfig().isDropPartitionColumns()) {
+    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
       schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema);
     }
     return schema;
@@ -222,9 +221,9 @@ public class TableSchemaResolver {
         List<Field> newFields = new ArrayList<>();
         for (String partitionField: partitionFields) {
           newFields.add(new Schema.Field(
-              partitionField, Schema.create(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
+              partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
         }
-        schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema, newFields);
+        schema = appendFieldsToSchema(schema, newFields);
       }
     }
     return schema;
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3904ff6f83..c0e97f3309 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -89,10 +89,10 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
 import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
-import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 59a24a79f0..e0e57e812b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table;
 
 import org.apache.avro.Schema;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 
@@ -57,7 +58,7 @@ public class TestTableSchemaResolver {
     assertNotEquals(originSchema, s4);
     assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition")));
     Schema.Field f = s4.getField("user_partition");
-    assertEquals(f.schema().getType().getName(), "string");
+    assertEquals(f.schema(), AvroSchemaUtils.createNullableSchema(Schema.Type.STRING));
 
     // case5: user_partition is in originSchema, but partition_path is in originSchema
     String[] pts4 = {"user_partition", "partition_path"};
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 0aa74ef154..0e4f9c304c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,13 +18,7 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
@@ -32,8 +26,8 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -46,6 +40,12 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -60,6 +60,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
+import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
+
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
 
@@ -287,6 +290,14 @@ public class HoodieRealtimeRecordReaderUtils {
     List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
         .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
 
-    return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
+    return appendNullSchemaFields(schema, fieldsToAdd);
+  }
+
+  private static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
+    List<Schema.Field> newFields = new ArrayList<>();
+    for (String newField : newFieldNames) {
+      newFields.add(new Schema.Field(newField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
+    }
+    return appendFieldsToSchema(schema, newFields);
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cc8fb0492a..556b0feef1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
 
 
 org.apache.hudi.DefaultSource
-org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index adf94fffde..5414a228c7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,14 +20,13 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.common.model.HoodieFileFormat
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -56,6 +55,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   override type FileSplit = HoodieBaseFileSplit
 
   override lazy val mandatoryColumns: Seq[String] =
+    // TODO reconcile, record's key shouldn't be mandatory for base-file only relation
     Seq(recordKeyField)
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
@@ -65,14 +65,14 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieUnsafeRDD = {
 
     val baseFileReader = createBaseFileReader(
       spark = sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = optParams,
@@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
+
     val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET =>
+        (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
     }
 
     if (globPaths.isEmpty) {
+      // NOTE: There are currently 2 ways partition values could be fetched:
+      //          - Source columns (producing the values used for physical partitioning) will be read
+      //          from the data file
+      //          - Values parsed from the actual partition pat would be appended to the final dataset
+      //
+      //        In the former case, we don't need to provide the partition-schema to the relation,
+      //        therefore we simply stub it w/ empty schema and use full table-schema as the one being
+      //        read from the data file.
+      //
+      //        In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
+      //        being a table-schema with all partition columns stripped out
+      val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
+        (fileIndex.partitionSchema, fileIndex.dataSchema)
+      } else {
+        (StructType(Nil), tableStructSchema)
+      }
+
       HadoopFsRelation(
         location = fileIndex,
-        partitionSchema = fileIndex.partitionSchema,
-        dataSchema = fileIndex.dataSchema,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
         bucketSpec = None,
         fileFormat = tableFileFormat,
         optParams)(sparkSession)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 53667f3b88..2fd1da5950 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.getPartitionPath
+import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
 import org.apache.hudi.common.fs.FSUtils
@@ -36,12 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.io.storage.HoodieHFileReader
-import org.apache.spark.TaskContext
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -50,11 +51,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
-import java.io.Closeable
 import java.net.URI
+import java.util.Locale
 import scala.collection.JavaConverters._
-import scala.util.Try
 import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
 
 trait HoodieFileSplit {}
 
@@ -78,7 +79,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   extends BaseRelation
     with FileRelation
     with PrunedFilteredScan
-    with SparkAdapterSupport
     with Logging {
 
   type FileSplit <: HoodieFileSplit
@@ -125,14 +125,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    val avroSchema = Try(schemaUtil.getTableAvroSchema).getOrElse(
-      // If there is no commit in the table, we can't get the schema
-      // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
-      userSchema match {
-        case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s, nullable = false, "record")
-        case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
-      }
-    )
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        logWarning("Failed to fetch schema from the table", e)
+        // If there is no commit in the table, we can't get the schema
+        // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
+        userSchema match {
+          case Some(s) => convertToAvroSchema(s)
+          case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
+        }
+    }
     // try to find internalSchema
     val internalSchemaFromMeta = try {
       schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
@@ -146,11 +149,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
 
-  /**
-   * if true, need to deal with schema for creating file reader.
-   */
-  protected val dropPartitionColumnsWhenWrite: Boolean =
-    metaClient.getTableConfig.isDropPartitionColumns && partitionColumns.nonEmpty
+  protected val shouldOmitPartitionColumns: Boolean =
+    metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
 
   /**
    * NOTE: PLEASE READ THIS CAREFULLY
@@ -205,14 +205,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    // NOTE: In case list of requested columns doesn't contain the Primary Key one, we
+    // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES
+    //
+    //       In case list of requested columns doesn't contain the Primary Key one, we
     //       have to add it explicitly so that
     //          - Merging could be performed correctly
     //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
-    //          Spark still fetches all the rows to execute the query correctly
+    //            Spark still fetches all the rows to execute the query correctly
     //
-    //       It's okay to return columns that have not been requested by the caller, as those nevertheless will be
-    //       filtered out upstream
+    //       *Appending* additional columns to the ones requested by the caller is not a problem, as those
+    //       will be "projected out" by the caller's projection;
+    //
+    // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
+    //       PROJECTION
     val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
 
     val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
-      // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in
-      // data files.
-      StructType(partitionColumns.map(StructField(_, StringType)))
-    } else {
-      StructType(Nil)
-    }
 
-    val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema)
-    val dataSchema = if (dropPartitionColumnsWhenWrite) {
-      val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        dataStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString()
-      )
-    } else {
-      tableSchema
-    }
-    val requiredSchema = if (dropPartitionColumnsWhenWrite) {
-      val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        requiredStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString()
-      )
+    val tableAvroSchemaStr =
+      if (internalSchema.isEmptySchema) tableAvroSchema.toString
+      else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
+
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+
+    // Since schema requested by the caller might contain partition columns, we might need to
+    // prune it, removing all partition columns from it in case these columns are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able to embed
+    //       values of partition columns (hereafter referred to as partition values) encoded into
+    //       the partition path, and omitted from the data file, back into fetched rows;
+    //       Note that, by default, partition columns are not omitted therefore specifying
+    //       partition schema for reader is not required
+    val (partitionSchema, dataSchema, prunedRequiredSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    if (fileSplits.isEmpty) {
+      sparkSession.sparkContext.emptyRDD
     } else {
-      HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema)
+      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters)
+
+      // NOTE: In case when partition columns have been pruned from the required schema, we have to project
+      //       the rows from the pruned schema back into the one expected by the caller
+      val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) {
+        rdd.mapPartitions { it =>
+          val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields)
+          val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
+          it.map(unsafeProjection)
+        }
+      } else {
+        rdd
+      }
+
+      // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
+      // Please check [[needConversion]] scala-doc for more details
+      projectedRDD.asInstanceOf[RDD[Row]]
     }
-    // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
-    // Please check [[needConversion]] scala-doc for more details
-    if (fileSplits.nonEmpty)
-      composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema, filters).asInstanceOf[RDD[Row]]
-    else
-      sparkSession.sparkContext.emptyRDD
   }
 
-
-
   /**
    * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
    *
    * @param fileSplits      file splits to be handled by the RDD
    * @param partitionSchema target table's partition schema
-   * @param tableSchema     target table's schema
+   * @param dataSchema      target table's data files' schema
    * @param requiredSchema  projected schema required by the reader
    * @param filters         data filters to be applied
    * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
    */
   protected def composeRDD(fileSplits: Seq[FileSplit],
                            partitionSchema: StructType,
-                           tableSchema: HoodieTableSchema,
+                           dataSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
                            filters: Array[Filter]): HoodieUnsafeRDD
 
@@ -325,16 +336,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
-    if (dropPartitionColumnsWhenWrite) {
-      if (requestedColumns.isEmpty) {
-        mandatoryColumns.toArray
-      } else {
-        requestedColumns
-      }
-    } else {
-      val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
-      requestedColumns ++ missing
-    }
+    val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
+    requestedColumns ++ missing
   }
 
   protected def getTableState: HoodieTableState = {
@@ -364,7 +367,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
     try {
       val tableConfig = metaClient.getTableConfig
-      if (dropPartitionColumnsWhenWrite) {
+      if (shouldOmitPartitionColumns) {
         val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
         val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
@@ -388,40 +391,47 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
         InternalRow.empty
     }
   }
-}
 
-object HoodieBaseRelation {
-
-  def getPartitionPath(fileStatus: FileStatus): Path =
-    fileStatus.getPath.getParent
+  protected def getColName(f: StructField): String = {
+    if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+      f.name
+    } else {
+      f.name.toLowerCase(Locale.ROOT)
+    }
+  }
 
   /**
    * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
    * over [[InternalRow]]
    */
-  def createBaseFileReader(spark: SparkSession,
-                           partitionSchema: StructType,
-                           tableSchema: HoodieTableSchema,
-                           requiredSchema: HoodieTableSchema,
-                           filters: Seq[Filter],
-                           options: Map[String, String],
-                           hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+  protected def createBaseFileReader(spark: SparkSession,
+                                     partitionSchema: StructType,
+                                     dataSchema: HoodieTableSchema,
+                                     requiredSchema: HoodieTableSchema,
+                                     filters: Seq[Filter],
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
     val hfileReader = createHFileReader(
       spark = spark,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = options,
       hadoopConf = hadoopConf
     )
+
+    // We're delegating to Spark to append partition values to every row only in cases
+    // when these corresponding partition-values are not persisted w/in the data file itself
+    val shouldAppendPartitionColumns = shouldOmitPartitionColumns
     val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
       sparkSession = spark,
-      dataSchema = tableSchema.structTypeSchema,
+      dataSchema = dataSchema.structTypeSchema,
       partitionSchema = partitionSchema,
       requiredSchema = requiredSchema.structTypeSchema,
       filters = filters,
       options = options,
-      hadoopConf = hadoopConf
+      hadoopConf = hadoopConf,
+      appendPartitionValues = shouldAppendPartitionColumns
     )
 
     partitionedFile => {
@@ -436,8 +446,38 @@ object HoodieBaseRelation {
     }
   }
 
+  private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+                                       requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
+    if (shouldOmitPartitionColumns) {
+      val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
+      val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
+      val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
+
+      (partitionSchema,
+        HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString),
+        HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString))
+    } else {
+      (StructType(Nil), tableSchema, requiredSchema)
+    }
+  }
+
+  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
+    StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))
+}
+
+object HoodieBaseRelation extends SparkAdapterSupport {
+
+  private def generateUnsafeProjection(from: StructType, to: StructType) =
+    sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to)
+
+  def convertToAvroSchema(structSchema: StructType): Schema =
+    sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
+
+  def getPartitionPath(fileStatus: FileStatus): Path =
+    fileStatus.getPath.getParent
+
   private def createHFileReader(spark: SparkSession,
-                                tableSchema: HoodieTableSchema,
+                                dataSchema: HoodieTableSchema,
                                 requiredSchema: HoodieTableSchema,
                                 filters: Seq[Filter],
                                 options: Map[String, String],
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 02264bc4a6..1fc9e70a5a 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -21,6 +21,7 @@ package org.apache.hudi
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.spark.sql.SparkSession
@@ -38,8 +39,8 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
 
 
   /**
-   * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]]
-   * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary.
+   * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]],
+   * when Parquet's Vectorized Reader is used
    */
   def buildHoodieParquetReader(sparkSession: SparkSession,
                                dataSchema: StructType,
@@ -47,9 +48,11 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
                                requiredSchema: StructType,
                                filters: Seq[Filter],
                                options: Map[String, String],
-                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+                               hadoopConf: Configuration,
+                               appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
 
-    val readParquetFile: PartitionedFile => Iterator[Any] = sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(
+    val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+    val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
       dataSchema = dataSchema,
       partitionSchema = partitionSchema,
@@ -91,9 +94,12 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
     * @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one.
     */
   def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {
-    conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
-    conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
-    conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
+    val querySchemaString = SerDeHelper.toJson(internalSchema)
+    if (!isNullOrEmpty(querySchemaString)) {
+      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
+      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath)
+      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
+    }
     conf
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index c86b1615ba..38062aa802 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -88,7 +88,7 @@ object HoodieSparkSqlWriter {
 
     val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
     val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
-    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
+    val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
       originKeyGeneratorClassName, parameters)
     //validate datasource and tableconfig keygen are the same
     validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
@@ -758,7 +758,7 @@ object HoodieSparkSqlWriter {
     (params, HoodieWriterUtils.convertMapToHoodieConfig(params))
   }
 
-  private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String,
+  private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator: String,
       params: Map[String, String]): Map[String, String] = {
     if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) ||
         keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 039dafb596..d9d5812adb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -20,8 +20,8 @@ package org.apache.hudi
 import org.apache.avro.Schema
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, HoodieRecord, HoodieReplaceCommitMetadata}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
-import java.util.stream.Collectors
 
+import java.util.stream.Collectors
 import org.apache.hadoop.fs.{GlobPattern, Path}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -183,7 +184,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
       sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
       val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
-        case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet"
+        case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
         case HoodieFileFormat.ORC => "orc"
       }
       sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 46e395fc2b..6aa7007851 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -19,9 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -61,14 +59,14 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieMergeOnReadRDD = {
     val fullSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
-      requiredSchema = tableSchema,
+      dataSchema = dataSchema,
+      requiredSchema = dataSchema,
       // This file-reader is used to read base file records, subsequently merging them with the records
       // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
       // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -86,7 +84,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     val requiredSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters ++ incrementalSpanRecordFilters,
       options = optParams,
@@ -99,7 +97,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
     // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
     //                 filtered, since file-reader might not be capable to perform filtering
     new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
-      tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
+      dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
   }
 
   override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index d85788e25b..a88eb63036 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,17 +20,14 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.sources.Filter
@@ -63,14 +60,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
                                     partitionSchema: StructType,
-                                    tableSchema: HoodieTableSchema,
+                                    dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieMergeOnReadRDD = {
     val fullSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
-      requiredSchema = tableSchema,
+      dataSchema = dataSchema,
+      requiredSchema = dataSchema,
       // This file-reader is used to read base file records, subsequently merging them with the records
       // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding
       // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that
@@ -85,7 +82,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     val requiredSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
-      tableSchema = tableSchema,
+      dataSchema = dataSchema,
       requiredSchema = requiredSchema,
       filters = filters,
       options = optParams,
@@ -96,7 +93,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
     val tableState = getTableState
     new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
-      tableSchema, requiredSchema, tableState, mergeType, fileSplits)
+      dataSchema, requiredSchema, tableState, mergeType, fileSplits)
   }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 1305323bd1..cd1c1fb4af 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -120,6 +120,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
   }
 
+  /**
+   * @VisibleForTesting
+   */
   def partitionSchema: StructType = {
     if (queryAsNonePartitionedTable) {
       // If we read it as Non-Partitioned table, we should not
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
similarity index 58%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
index 150178ea69..dbb62d089e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala
@@ -23,26 +23,32 @@ import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 
-class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
-  override def shortName(): String = "HoodieParquet"
+class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
 
-  override def toString: String = "HoodieParquet"
+  override def shortName(): String = FILE_FORMAT_ID
 
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+  override def toString: String = "Hoodie-Parquet"
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
     sparkAdapter
-      .createHoodieParquetFileFormat().get
+      .createHoodieParquetFileFormat(appendPartitionValues = false).get
       .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
   }
 }
 
+object HoodieParquetFileFormat {
+
+  val FILE_FORMAT_ID = "hoodie-parquet"
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index b232ef010f..28a6dcdcd6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -747,7 +747,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(resultSchema, schema1)
   }
 
-  @ParameterizedTest @ValueSource(booleans = Array(true, false))
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
   def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) {
     val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -897,9 +898,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
       readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
   }
 
-  @Disabled("HUDI-3204")
-  @Test
-  def testHoodieBaseFileOnlyViewRelation(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = {
     val _spark = spark
     import _spark.implicits._
 
@@ -925,18 +926,27 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .mode(org.apache.spark.sql.SaveMode.Append)
       .save(basePath)
 
-    val res = spark.read.format("hudi").load(basePath)
+    // NOTE: We're testing here that both paths are appropriately handling
+    //       partition values, regardless of whether we're reading the table
+    //       t/h a globbed path or not
+    val path = if (useGlobbing) {
+      s"$basePath/*/*/*/*"
+    } else {
+      basePath
+    }
+
+    val res = spark.read.format("hudi").load(path)
 
     assert(res.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
     assertEquals(
-      res.select("data_date").map(_.get(0).toString).collect().sorted,
-      Array("2018-09-23", "2018-09-24")
+      res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq,
+      Seq("2018-09-23", "2018-09-24")
     )
     assertEquals(
-      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
-      Array("2018/09/23", "2018/09/24")
+      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq,
+      Seq("2018/09/23", "2018/09/24")
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index bd7edd4db5..48bb46f81b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -57,7 +57,6 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
 
-  @Disabled("HUDI-3896")
   @ParameterizedTest
   @CsvSource(Array(
     "true,org.apache.hudi.keygen.SimpleKeyGenerator",
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index e4b3c4010a..0e74c997d7 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
@@ -165,7 +165,7 @@ class Spark2Adapter extends SparkAdapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    Some(new ParquetFileFormat)
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
new file mode 100644
index 0000000000..6fb5c50c03
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ * </ol>
+ */
+class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val fileSplit =
+        new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+      val filePath = fileSplit.getPath
+
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          fileSplit.getStart,
+          fileSplit.getStart + fileSplit.getLength,
+          fileSplit.getLength,
+          fileSplit.getLocations,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
+          pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+        filters
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter(parquetSchema, _))
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+        } else {
+          None
+        }
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader = new VectorizedParquetRecordReader(
+          convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (shouldAppendPartitionValues) {
+          vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        } else {
+          vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+        }
+
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns UnsafeRow
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
+        } else {
+          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+        }
+        val iter = new RecordReaderIterator(reader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        reader.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+        val joinedRow = new JoinedRow()
+        val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+        // This is a horrible erasure hack...  if we type the iterator above, then it actually check
+        // the type in next() and we get a class cast exception.  If we make that function return
+        // Object, then we can defer the cast until later!
+        //
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          iter.asInstanceOf[Iterator[InternalRow]]
+            .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+        }
+
+      }
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 13dba82488..cd5cd9c82f 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,14 +19,13 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
-import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer, HoodieSparkAvroSchemaConverters}
-import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils}
 import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat}
+import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession}
 
 /**
@@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark312HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
index 83b3162bbc..769373866f 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala
@@ -17,279 +17,312 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-import java.util
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
-
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
-import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark312HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split =
-          new org.apache.parquet.hadoop.ParquetInputSplit(
-            filePath,
-            file.start,
-            file.start + file.length,
-            file.length,
-            Array.empty,
-            null)
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+
+      lazy val footerFileMetaData =
+        ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive,
+            datetimeRebaseMode)
         } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
+          createParquetFilters(
+            parquetSchema,
+            pushDownDate,
+            pushDownTimestamp,
+            pushDownDecimal,
+            pushDownStringStartWith,
+            pushDownInFilterThreshold,
+            isCaseSensitive)
         }
+        filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
 
-        lazy val footerFileMetaData =
-          ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive,
-              datetimeRebaseMode)
-          } else {
-            Spark312HoodieParquetFileFormat.createParquetFilters(
-              parquetSchema,
-              pushDownDate,
-              pushDownTimestamp,
-              pushDownDecimal,
-              pushDownStringStartWith,
-              pushDownInFilterThreshold,
-              isCaseSensitive)
-          }
-          filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+        footerFileMetaData.getKeyValueMetaData.get,
+        SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader =
+          if (shouldUseInternalSchema) {
+            new Spark312HoodieVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity,
+              typeChangeInfos)
           } else {
-            None
+            new VectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseMode.toString,
+              int96RebaseMode.toString,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
-        val int96RebaseMode = DataSourceUtils.int96RebaseMode(
-          footerFileMetaData.getKeyValueMetaData.get,
-          SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
-        //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseMode.toString,
-            int96RebaseMode.toString,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          vectorizedReader.initialize(split, hadoopAttemptContext)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion listener before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (shouldAppendPartitionValues) {
           logDebug(s"Appending $partitionSchema ${file.partitionValues}")
           vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-          if (returningBatch) {
-            vectorizedReader.enableReturningBatches()
-          }
+        } else {
+          vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+        }
+
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
 
-          // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-          iter.asInstanceOf[Iterator[InternalRow]]
+        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns InternalRow
+        val readSupport = new ParquetReadSupport(
+          convertTz,
+          enableVectorizedReader = false,
+          datetimeRebaseMode,
+          int96RebaseMode)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
         } else {
-          logDebug(s"Falling back to parquet-mr")
-          // ParquetRecordReader returns InternalRow
-          val readSupport = new ParquetReadSupport(
-            convertTz,
-            enableVectorizedReader = false,
-            datetimeRebaseMode,
-            int96RebaseMode)
-          val reader = if (pushed.isDefined && enableRecordFilter) {
-            val parquetFilter = FilterCompat.get(pushed.get, null)
-            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-          } else {
-            new ParquetRecordReader[InternalRow](readSupport)
-          }
-          val iter = new RecordReaderIterator[InternalRow](reader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          reader.initialize(split, hadoopAttemptContext)
-
-          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
-          val unsafeProjection = if (typeChangeInfos.isEmpty) {
-            GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-          } else {
-            // find type changed.
-            val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
-              if (typeChangeInfos.containsKey(i)) {
-                StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
-              } else f
-            }).toAttributes ++ partitionSchema.toAttributes
-            val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
-              if (typeChangeInfos.containsKey(i)) {
-                Cast(attr, typeChangeInfos.get(i).getLeft)
-              } else attr
-            }
-            GenerateUnsafeProjection.generate(castSchema, newFullSchema)
-          }
+          new ParquetRecordReader[InternalRow](readSupport)
+        }
+        val iter = new RecordReaderIterator[InternalRow](reader)
+        // SPARK-23457 Register a task completion listener before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+        reader.initialize(split, hadoopAttemptContext)
 
-          if (partitionSchema.length == 0) {
-            // There is no partition columns
-            iter.map(unsafeProjection)
-          } else {
-            val joinedRow = new JoinedRow()
-            iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+        val unsafeProjection = if (typeChangeInfos.isEmpty) {
+          GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+        } else {
+          // find type changed.
+          val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+            if (typeChangeInfos.containsKey(i)) {
+              StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
+            } else f
+          }).toAttributes ++ partitionSchema.toAttributes
+          val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+            if (typeChangeInfos.containsKey(i)) {
+              Cast(attr, typeChangeInfos.get(i).getLeft)
+            } else attr
           }
+          GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+        }
+
+        // NOTE: We're making appending of the partitioned values to the rows read from the
+        //       data file configurable
+        if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
         }
       }
     }
@@ -300,6 +333,16 @@ object Spark312HoodieParquetFileFormat {
 
   val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
+    val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+    if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+      val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
+      SerDeHelper.toJson(prunedSchema)
+    } else {
+      internalSchemaStr
+    }
+  }
+
   private def createParquetFilters(arg: Any*): ParquetFilters = {
     val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader)
     val ctor = clazz.getConstructors.head
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index bad392b4f9..15624c7411 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat}
 import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession}
@@ -80,14 +80,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
     }
   }
 
-  override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = {
-    if (SPARK_VERSION.startsWith("3.2")) {
-      val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat"
-      val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      Some(ctor.newInstance().asInstanceOf[ParquetFileFormat])
-    } else {
-      None
-    }
+  override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
+    Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
index 28db473965..f2a0a21df8 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
@@ -27,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.action.InternalSchemaMerger
@@ -34,226 +33,266 @@ import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
-
-  // reference ParquetFileFormat from spark project
-  override def buildReaderWithPartitionValues(
-                                               sparkSession: SparkSession,
-                                               dataSchema: StructType,
-                                               partitionSchema: StructType,
-                                               requiredSchema: StructType,
-                                               filters: Seq[Filter],
-                                               options: Map[String, String],
-                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
-      // fallback to origin parquet File read
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)
-    } else {
-      hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
-      hadoopConf.set(
-        ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        ParquetWriteSupport.SPARK_ROW_SCHEMA,
-        requiredSchema.json)
-      hadoopConf.set(
-        SQLConf.SESSION_LOCAL_TIMEZONE.key,
-        sparkSession.sessionState.conf.sessionLocalTimeZone)
-      hadoopConf.setBoolean(
-        SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-        sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-      hadoopConf.setBoolean(
-        SQLConf.CASE_SENSITIVE.key,
-        sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-      ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
-
-      // Sets flags for `ParquetToSparkSchemaConverter`
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        sparkSession.sessionState.conf.isParquetBinaryAsString)
-      hadoopConf.setBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
-      // for dataSource v1, we have no method to do project for spark physical plan.
-      // it's safe to do cols project here.
-      val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-      val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-      if (querySchemaOption.isPresent && !requiredSchema.isEmpty) {
-        val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
-        hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema))
+import java.net.URI
+
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ the following changes applied to it:
+ * <ol>
+ *   <li>Avoiding appending partition values to the rows read from the data file</li>
+ *   <li>Schema on-read</li>
+ * </ol>
+ */
+class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
+    hadoopConf.set(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      ParquetWriteSupport.SPARK_ROW_SCHEMA,
+      requiredSchema.json)
+    hadoopConf.set(
+      SQLConf.SESSION_LOCAL_TIMEZONE.key,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
+      SQLConf.CASE_SENSITIVE.key,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+    val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+    // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself,
+    // therefore it's safe to do schema projection here
+    if (!isNullOrEmpty(internalSchemaStr)) {
+      val prunedInternalSchemaStr =
+        pruneInternalSchema(internalSchemaStr, requiredSchema)
+      hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr)
+    }
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    // TODO: if you move this into the closure it reverts to the default values.
+    // If true, enable using the custom RecordReader for parquet. This only works for
+    // a subset of the types (no complex types).
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val sqlConf = sparkSession.sessionState.conf
+    val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
+    val enableVectorizedReader: Boolean =
+      sqlConf.parquetVectorizedReaderEnabled &&
+        resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+    val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
+    val capacity = sqlConf.parquetVectorizedReaderBatchSize
+    val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch = supportBatch(sparkSession, resultSchema)
+    val pushDownDate = sqlConf.parquetFilterPushDownDate
+    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
+    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+    val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
+    val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+    val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+
+    (file: PartitionedFile) => {
+      assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size)
+
+      val filePath = new Path(new URI(file.filePath))
+      val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+
+      val sharedConf = broadcastedHadoopConf.value.value
+
+      // Fetch internal schema
+      val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+      // Internal schema has to be pruned at this point
+      val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+
+      val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent
+
+      val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+      val fileSchema = if (shouldUseInternalSchema) {
+        val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+        InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+      } else {
+        null
       }
-      val broadcastedHadoopConf =
-        sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
-
-      // TODO: if you move this into the closure it reverts to the default values.
-      // If true, enable using the custom RecordReader for parquet. This only works for
-      // a subset of the types (no complex types).
-      val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
-      val sqlConf = sparkSession.sessionState.conf
-      val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
-      val enableVectorizedReader: Boolean =
-        sqlConf.parquetVectorizedReaderEnabled &&
-          resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
-      val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
-      val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
-      val capacity = sqlConf.parquetVectorizedReaderBatchSize
-      val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-      // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-      val returningBatch = supportBatch(sparkSession, resultSchema)
-      val pushDownDate = sqlConf.parquetFilterPushDownDate
-      val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
-      val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
-      val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
-      val isCaseSensitive = sqlConf.caseSensitiveAnalysis
-      val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
-      val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
-      val int96RebaseModeInread = parquetOptions.int96RebaseModeInRead
-
-      (file: PartitionedFile) => {
-        assert(file.partitionValues.numFields == partitionSchema.size)
-        val filePath = new Path(new URI(file.filePath))
-        val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
-        val sharedConf = broadcastedHadoopConf.value.value
-        // do deal with internalSchema
-        val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
-        // querySchema must be a pruned schema.
-        val querySchemaOption = SerDeHelper.fromJson(internalSchemaString)
-        val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true
-        val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
-        val fileSchema = if (internalSchemaChangeEnabled) {
-          val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
-          InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits)
-        } else {
-          // this should not happened, searchSchemaAndCache will deal with correctly.
-          null
-        }
 
-        lazy val footerFileMetaData =
-          ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
-        val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
-          footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
-        // Try to push down filters when filter push-down is enabled.
-        val pushed = if (enableParquetFilterPushDown) {
-          val parquetSchema = footerFileMetaData.getSchema
-          val parquetFilters = new ParquetFilters(
-            parquetSchema,
-            pushDownDate,
-            pushDownTimestamp,
-            pushDownDecimal,
-            pushDownStringStartWith,
-            pushDownInFilterThreshold,
-            isCaseSensitive,
-            datetimeRebaseSpec)
-          filters.map(Spark32HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get()))
-            // Collects all converted Parquet filter predicates. Notice that not all predicates can be
-            // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
-            // is used here.
-            .flatMap(parquetFilters.createFilter(_))
-            .reduceOption(FilterApi.and)
+      lazy val footerFileMetaData =
+        ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
+      val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+        footerFileMetaData.getKeyValueMetaData.get,
+        datetimeRebaseModeInRead)
+      // Try to push down filters when filter push-down is enabled.
+      val pushed = if (enableParquetFilterPushDown) {
+        val parquetSchema = footerFileMetaData.getSchema
+        val parquetFilters = new ParquetFilters(
+          parquetSchema,
+          pushDownDate,
+          pushDownTimestamp,
+          pushDownDecimal,
+          pushDownStringStartWith,
+          pushDownInFilterThreshold,
+          isCaseSensitive,
+          datetimeRebaseSpec)
+        filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
+          // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+          // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+          // is used here.
+          .flatMap(parquetFilters.createFilter)
+          .reduceOption(FilterApi.and)
+      } else {
+        None
+      }
+
+      // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
+      // *only* if the file was created by something other than "parquet-mr", so check the actual
+      // writer here for this file.  We have to do this per-file, as each file in the table may
+      // have different writers.
+      // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
+      def isCreatedByParquetMr: Boolean =
+        footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+      val convertTz =
+        if (timestampConversion && !isCreatedByParquetMr) {
+          Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
         } else {
           None
         }
 
-        // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
-        // *only* if the file was created by something other than "parquet-mr", so check the actual
-        // writer here for this file.  We have to do this per-file, as each file in the table may
-        // have different writers.
-        // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
-        def isCreatedByParquetMr: Boolean =
-          footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
-
-        val convertTz =
-          if (timestampConversion && !isCreatedByParquetMr) {
-            Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+        footerFileMetaData.getKeyValueMetaData.get,
+        int96RebaseModeInRead)
+
+      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+
+      // Clone new conf
+      val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value)
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
+      if (shouldUseInternalSchema) {
+        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
+        val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
+        hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
+      }
+      val hadoopAttemptContext =
+        new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
+
+      // Try to push down filters when filter push-down is enabled.
+      // Notice: This push-down is RowGroups level, not individual records.
+      if (pushed.isDefined) {
+        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
+      }
+      val taskContext = Option(TaskContext.get())
+      if (enableVectorizedReader) {
+        val vectorizedReader =
+          if (shouldUseInternalSchema) {
+            new Spark32HoodieVectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseSpec.mode.toString,
+              datetimeRebaseSpec.timeZone,
+              int96RebaseSpec.mode.toString,
+              int96RebaseSpec.timeZone,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity,
+              typeChangeInfos)
           } else {
-            None
+            new VectorizedParquetRecordReader(
+              convertTz.orNull,
+              datetimeRebaseSpec.mode.toString,
+              datetimeRebaseSpec.timeZone,
+              int96RebaseSpec.mode.toString,
+              int96RebaseSpec.timeZone,
+              enableOffHeapColumnVector && taskContext.isDefined,
+              capacity)
           }
-        val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
-          footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInread)
-
-        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-        // use new conf
-        val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value)
+        // SPARK-37089: We cannot register a task completion listener to close this iterator here
+        // because downstream exec nodes have already registered their listeners. Since listeners
+        // are executed in reverse order of registration, a listener registered here would close the
+        // iterator while downstream exec nodes are still running. When off-heap column vectors are
+        // enabled, this can cause a use-after-free bug leading to a segfault.
         //
-        // reset request schema
-        var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap()
-        if (internalSchemaChangeEnabled) {
-          val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema()
-          val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-          typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema)
-          hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json)
-        }
-        val hadoopAttemptContext =
-          new TaskAttemptContextImpl(hadoopAttempConf, attemptId)
+        // Instead, we use FileScanRDD's task completion listener to close this iterator.
+        val iter = new RecordReaderIterator(vectorizedReader)
+        try {
+          vectorizedReader.initialize(split, hadoopAttemptContext)
 
-        // Try to push down filters when filter push-down is enabled.
-        // Notice: This push-down is RowGroups level, not individual records.
-        if (pushed.isDefined) {
-          ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
-        }
-        val taskContext = Option(TaskContext.get())
-        if (enableVectorizedReader) {
-          val vectorizedReader = new Spark32HoodieVectorizedParquetRecordReader(
-            convertTz.orNull,
-            datetimeRebaseSpec.mode.toString,
-            datetimeRebaseSpec.timeZone,
-            int96RebaseSpec.mode.toString,
-            int96RebaseSpec.timeZone,
-            enableOffHeapColumnVector && taskContext.isDefined,
-            capacity, typeChangeInfos)
-          val iter = new RecordReaderIterator(vectorizedReader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          //          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-          try {
-            vectorizedReader.initialize(split, hadoopAttemptContext)
+          // NOTE: We're making appending of the partitioned values to the rows read from the
+          //       data file configurable
+          if (shouldAppendPartitionValues) {
             logDebug(s"Appending $partitionSchema ${file.partitionValues}")
             vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-            if (returningBatch) {
-              vectorizedReader.enableReturningBatches()
-            }
+          } else {
+            vectorizedReader.initBatch(StructType(Nil), InternalRow.empty)
+          }
 
-            // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-            iter.asInstanceOf[Iterator[InternalRow]]
-          } catch {
-            case e: Throwable =>
-              // SPARK-23457: In case there is an exception in initialization, close the iterator to
-              // avoid leaking resources.
-              iter.close()
-              throw e
+          if (returningBatch) {
+            vectorizedReader.enableReturningBatches()
           }
+
+          // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
+        }
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns InternalRow
+        val readSupport = new ParquetReadSupport(
+          convertTz,
+          enableVectorizedReader = false,
+          datetimeRebaseSpec,
+          int96RebaseSpec)
+        val reader = if (pushed.isDefined && enableRecordFilter) {
+          val parquetFilter = FilterCompat.get(pushed.get, null)
+          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
         } else {
-          logDebug(s"Falling back to parquet-mr")
-          // ParquetRecordReader returns InternalRow
-          val readSupport = new ParquetReadSupport(
-            convertTz,
-            enableVectorizedReader = false,
-            datetimeRebaseSpec,
-            int96RebaseSpec)
-          val reader = if (pushed.isDefined && enableRecordFilter) {
-            val parquetFilter = FilterCompat.get(pushed.get, null)
-            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-          } else {
-            new ParquetRecordReader[InternalRow](readSupport)
-          }
-          val iter = new RecordReaderIterator[InternalRow](reader)
-          // SPARK-23457 Register a task completion listener before `initialization`.
-          taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+          new ParquetRecordReader[InternalRow](readSupport)
+        }
+        val iter = new RecordReaderIterator[InternalRow](reader)
+        try {
           reader.initialize(split, hadoopAttemptContext)
 
           val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
@@ -274,13 +313,21 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
             GenerateUnsafeProjection.generate(castSchema, newFullSchema)
           }
 
-          if (partitionSchema.length == 0) {
+          // NOTE: We're making appending of the partitioned values to the rows read from the
+          //       data file configurable
+          if (!shouldAppendPartitionValues || partitionSchema.length == 0) {
             // There is no partition columns
             iter.map(unsafeProjection)
           } else {
             val joinedRow = new JoinedRow()
             iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
           }
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, close the iterator to
+            // avoid leaking resources.
+            iter.close()
+            throw e
         }
       }
     }
@@ -289,6 +336,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
 
 object Spark32HoodieParquetFileFormat {
 
+  def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
+    val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
+    if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {
+      val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get())
+      SerDeHelper.toJson(prunedSchema)
+    } else {
+      internalSchemaStr
+    }
+  }
+
   private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
     if (fileSchema == null || querySchema == null) {
       oldFilter


[hudi] 01/07: [HUDI-3902] Fallback to `HadoopFsRelation` in cases non-involving Schema Evolution (#5352)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 40c80f6627956e4cdb7a931c40b7363e81272c58
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Tue Apr 19 10:40:20 2022 -0700

    [HUDI-3902] Fallback to `HadoopFsRelation` in cases non-involving Schema Evolution (#5352)
    
    
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../org/apache/hudi/BaseFileOnlyRelation.scala     | 50 +++++++++++++++++++++-
 .../main/scala/org/apache/hudi/DefaultSource.scala | 36 ++++++++++++++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 15 ++++---
 .../scala/org/apache/hudi/SparkDatasetMixin.scala  | 43 +++++++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala | 19 ++++----
 .../hudi/functional/TestCOWDataSourceStorage.scala |  3 +-
 .../apache/hudi/functional/TestMORDataSource.scala | 20 +++------
 .../functional/TestParquetColumnProjection.scala   |  9 ++--
 8 files changed, 156 insertions(+), 39 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index f46b31b036..adf94fffde 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,13 +20,15 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-
 import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
+import org.apache.hudi.common.model.HoodieFileFormat
 import org.apache.hudi.common.table.HoodieTableMetaClient
-
+import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 
@@ -104,4 +106,48 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
     sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
       .map(HoodieBaseFileSplit.apply)
   }
+
+  /**
+   * NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
+   *       equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
+   *       and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
+   *       rule; you can find more details in HUDI-3896)
+   */
+  def toHadoopFsRelation: HadoopFsRelation = {
+    val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
+      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+    }
+
+    if (globPaths.isEmpty) {
+      HadoopFsRelation(
+        location = fileIndex,
+        partitionSchema = fileIndex.partitionSchema,
+        dataSchema = fileIndex.dataSchema,
+        bucketSpec = None,
+        fileFormat = tableFileFormat,
+        optParams)(sparkSession)
+    } else {
+      val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
+      val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
+
+      DataSource.apply(
+        sparkSession = sparkSession,
+        paths = extraReadPaths,
+        userSpecifiedSchema = userSchema,
+        className = formatClassName,
+        // Since we're reading the table as just collection of files we have to make sure
+        // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
+        // while keeping previous versions of the files around as well.
+        //
+        // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
+        options = optParams ++ Map(
+          "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
+        ),
+        partitionColumns = partitionColumns
+      )
+        .resolveRelation()
+        .asInstanceOf[HadoopFsRelation]
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 7550ff13fd..c1229d5500 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
@@ -108,7 +110,7 @@ class DefaultSource extends RelationProvider
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
+          resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
 
@@ -141,7 +143,7 @@ class DefaultSource extends RelationProvider
     *
     * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
     *       That is the only case where Spark seems to actually need a relation to be returned here
-    *       [[DataSource.writeAndRead()]]
+    *       [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
     *
     * @param sqlContext Spark SQL Context
     * @param mode Mode for saving the DataFrame at the destination
@@ -206,4 +208,32 @@ class DefaultSource extends RelationProvider
                             parameters: Map[String, String]): Source = {
     new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
   }
+
+  private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
+                                          globPaths: Seq[Path],
+                                          userSchema: Option[StructType],
+                                          metaClient: HoodieTableMetaClient,
+                                          optParams: Map[String, String]) = {
+    val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
+    val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema
+
+    // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
+    //       [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
+    //       vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
+    //
+    //       You can check out HUDI-3896 for more details
+    if (enableSchemaOnRead) {
+      baseRelation
+    } else {
+      baseRelation.toHadoopFsRelation
+    }
+  }
+
+  private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) =
+    try {
+      new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
+        .orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 769016ca8a..53667f3b88 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -19,12 +19,10 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-
 import org.apache.hudi.HoodieBaseRelation.getPartitionPath
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
@@ -38,13 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.io.storage.HoodieHFileReader
-
 import org.apache.spark.TaskContext
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -54,7 +52,6 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.io.Closeable
 import java.net.URI
-
 import scala.collection.JavaConverters._
 import scala.util.Try
 import scala.util.control.NonFatal
@@ -78,7 +75,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
                                   val metaClient: HoodieTableMetaClient,
                                   val optParams: Map[String, String],
                                   userSchema: Option[StructType])
-  extends BaseRelation with PrunedFilteredScan with Logging with SparkAdapterSupport {
+  extends BaseRelation
+    with FileRelation
+    with PrunedFilteredScan
+    with SparkAdapterSupport
+    with Logging {
 
   type FileSplit <: HoodieFileSplit
 
@@ -198,6 +199,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    */
   override final def needConversion: Boolean = false
 
+  override def inputFiles: Array[String] = fileIndex.allFiles.map(_.getPath.toUri.toString).toArray
+
   /**
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
@@ -255,6 +258,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       sparkSession.sparkContext.emptyRDD
   }
 
+
+
   /**
    * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
    *
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala
new file mode 100644
index 0000000000..ee733a86a6
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions.collectionAsScalaIterable
+
+trait SparkDatasetMixin {
+
+  def toDataset(spark: SparkSession, records: java.util.List[HoodieRecord[_]]) = {
+    val avroRecords = records.map(
+      _.getData
+        .asInstanceOf[HoodieRecordPayload[_]]
+        .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA)
+        .get
+        .asInstanceOf[GenericRecord]
+    )
+      .toSeq
+    val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords)
+    AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark)
+  }
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 000004ace9..b232ef010f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -37,7 +37,7 @@ import org.joda.time.DateTime
 import org.joda.time.format.DateTimeFormat
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
 import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
@@ -897,6 +897,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
       readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
   }
 
+  @Disabled("HUDI-3204")
   @Test
   def testHoodieBaseFileOnlyViewRelation(): Unit = {
     val _spark = spark
@@ -918,9 +919,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
       .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
       .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING")
+      .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
       .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd")
       .option(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00")
-      .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
       .mode(org.apache.spark.sql.SaveMode.Append)
       .save(basePath)
 
@@ -929,15 +930,13 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assert(res.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
-    assertTrue(
-      res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements(
-        Array("2018-09-23", "2018-09-24")
-      )
+    assertEquals(
+      res.select("data_date").map(_.get(0).toString).collect().sorted,
+      Array("2018-09-23", "2018-09-24")
     )
-    assertTrue(
-      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements(
-        Array("2018/09/23", "2018/09/24")
-      )
+    assertEquals(
+      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
+      Array("2018/09/23", "2018/09/24")
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index e7daf08d11..bd7edd4db5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -33,7 +33,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Disabled, Tag}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
@@ -57,6 +57,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
 
+  @Disabled("HUDI-3896")
   @ParameterizedTest
   @CsvSource(Array(
     "true,org.apache.hudi.keygen.SimpleKeyGenerator",
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index d8ebe5cbcd..b57cbb09ed 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin}
 import org.apache.log4j.LogManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -48,7 +48,7 @@ import scala.collection.JavaConverters._
 /**
  * Tests on Spark DataSource for MOR table.
  */
-class TestMORDataSource extends HoodieClientTestBase {
+class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
 
   var spark: SparkSession = null
   private val log = LogManager.getLogger(classOf[TestMORDataSource])
@@ -356,7 +356,7 @@ class TestMORDataSource extends HoodieClientTestBase {
 
     val hoodieRecords1 = dataGen.generateInserts("001", 100)
 
-    val inputDF1 = toDataset(hoodieRecords1)
+    val inputDF1 = toDataset(spark, hoodieRecords1)
     inputDF1.write.format("org.apache.hudi")
       .options(opts)
       .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
@@ -382,7 +382,7 @@ class TestMORDataSource extends HoodieClientTestBase {
     // Upsert 50 update records
     // Snopshot view should read 100 records
     val records2 = dataGen.generateUniqueUpdates("002", 50)
-    val inputDF2 = toDataset(records2)
+    val inputDF2 = toDataset(spark, records2)
     inputDF2.write.format("org.apache.hudi")
       .options(opts)
       .mode(SaveMode.Append)
@@ -429,7 +429,7 @@ class TestMORDataSource extends HoodieClientTestBase {
     verifyShow(hudiIncDF1Skipmerge)
 
     val record3 = dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1)
-    val inputDF3 = toDataset(record3)
+    val inputDF3 = toDataset(spark, record3)
     inputDF3.write.format("org.apache.hudi").options(opts)
       .mode(SaveMode.Append).save(basePath)
 
@@ -443,16 +443,6 @@ class TestMORDataSource extends HoodieClientTestBase {
     assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
   }
 
-  private def toDataset(records: util.List[HoodieRecord[_]]) = {
-    val avroRecords = records.map(_.getData
-      .asInstanceOf[HoodieRecordPayload[_]]
-      .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA)
-      .get
-      .asInstanceOf[GenericRecord])
-    val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords, 2)
-    AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark)
-  }
-
   @Test
   def testVectorizedReader() {
     spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 2cdd7880bf..f670450c3e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.{Disabled, Tag, Test}
 
 import scala.collection.JavaConverters._
 
@@ -53,6 +53,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
     DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
   )
 
+  @Disabled("HUDI-3896")
   @Test
   def testBaseFileOnlyViewRelation(): Unit = {
     val tablePath = s"$basePath/cow"
@@ -129,7 +130,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         fail("Only Spark 3 and Spark 2 are currently supported")
 
     // Test MOR / Read Optimized
-    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
+    // TODO(HUDI-3896) re-enable
+    //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
   }
 
   @Test
@@ -184,7 +186,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         fail("Only Spark 3 and Spark 2 are currently supported")
 
     // Test MOR / Read Optimized
-    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
+    // TODO(HUDI-3896) re-enable
+    //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
   }
 
   @Test


[hudi] 06/07: [HUDI-3912] Fix lose data when rollback in flink async compact (#5357)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fcaeaea62f42e0bd1e49970ebcc1830519c80f2a
Author: 吴祥平 <40...@qq.com>
AuthorDate: Wed Apr 20 19:23:39 2022 +0800

    [HUDI-3912] Fix lose data when rollback in flink async compact (#5357)
    
    * stop add event when has failed compact event
    
    Co-authored-by: wxp <wx...@outlook.com>
---
 .../apache/hudi/sink/compact/CompactionCommitSink.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index ecd66936e8..c9fb7aceb2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -101,11 +101,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
   @Override
   public void invoke(CompactionCommitEvent event, Context context) throws Exception {
     final String instant = event.getInstant();
-    if (event.isFailed()) {
-      // handle failure case
-      CompactionUtil.rollbackCompaction(table, event.getInstant());
-      return;
-    }
     commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
         .put(event.getFileId(), event);
     commitIfNecessary(instant, commitBuffer.get(instant).values());
@@ -132,6 +127,18 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
     if (!isReady) {
       return;
     }
+
+    if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
+      try {
+        // handle failure case
+        CompactionUtil.rollbackCompaction(table, instant);
+      } finally {
+        // remove commitBuffer to avoid obsolete metadata commit
+        reset(instant);
+        return;
+      }
+    }
+
     try {
       doCommit(instant, events);
     } catch (Throwable throwable) {