You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2022/10/17 23:09:11 UTC

[hudi] branch release-feature-rfc46 updated: [MINOR] Additional fixes for #6745 (#6947)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
     new 0036519120 [MINOR] Additional fixes for #6745 (#6947)
0036519120 is described below

commit 00365191206955b868b2675f2345b6f650b291a0
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Mon Oct 17 16:09:02 2022 -0700

    [MINOR] Additional fixes for #6745 (#6947)
    
    * Tidying up
    
    * Tidying up more
    
    * Cleaning up duplication
    
    * Tidying up
    
    * Revisited legacy operating mode configuration
    
    * Tidying up
    
    * Cleaned up `projectUnsafe` API
    
    * Fixing compilation
    
    * Cleaning up `HoodieSparkRecord` ctors;
    Revisited mandatory unsafe-projection
    
    * Fixing compilation
    
    * Cleaned up `ParquetReader` initialization
    
    * Revisited `HoodieSparkRecord` to accept either `UnsafeRow` or `HoodieInternalRow`, and avoid unnecessary copying after unsafe-projection
    
    * Cleaning up redundant exception spec
    
    * Make sure `updateMetadataFields` properly wraps `InternalRow` into `HoodieInternalRow` if necessary;
    Cleaned up `MetadataValues`
    
    * Fixed meta-fields extraction and `HoodieInternalRow` composition w/in `HoodieSparkRecord`
    
    * De-duplicate `HoodieSparkRecord` ctors;
    Make sure either only `UnsafeRow` or `HoodieInternalRow` are permitted inside `HoodieSparkRecord`
    
    * Removed unnecessary copying
    
    * Cleaned up projection for `HoodieSparkRecord` (dropping partition columns);
    Removed unnecessary copying
    
    * Fixing compilation
    
    * Fixing compilation (for Flink)
    
    * Cleaned up File Raders' interfaces:
      - Extracted `HoodieSeekingFileReader` interface (for key-ranged reads)
      - Pushed down concrete implementation methods into `HoodieAvroFileReaderBase` from the interfaces
    
    * Cleaned up File Readers impls (inline with then new interfaces)
    
    * Rebsaed `HoodieBackedTableMetadata` onto new `HoodieSeekingFileReader`
    
    * Tidying up
    
    * Missing licenses
    
    * Re-instate custom override for `HoodieAvroParquetReader`;
    Tidying up
    
    * Fixed missing cloning w/in `HoodieLazyInsertIterable`
    
    * Fixed missing cloning in deduplication flow
    
    * Allow `HoodieSparkRecord` to hold `ColumnarBatchRow`
    
    * Missing licenses
    
    * Fixing compilation
    
    * Missing changes
    
    * Fixed Spark 2.x validation whether the row was read as a batch
---
 .../hudi/execution/HoodieLazyInsertIterable.java   |  28 ++--
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   3 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   7 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  28 ++--
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  11 ++
 .../hudi/table/action/commit/BaseMergeHelper.java  |  14 +-
 .../hudi/table/action/commit/BaseWriteHelper.java  |  14 +-
 .../table/action/commit/HoodieWriteHelper.java     |  11 +-
 .../io/storage/TestHoodieHFileReaderWriter.java    |   4 +-
 .../hudi/execution/FlinkLazyInsertIterable.java    |   2 +-
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |  15 +--
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |  15 +--
 .../hudi/table/action/commit/FlinkWriteHelper.java |   9 +-
 .../hudi/execution/JavaLazyInsertIterable.java     |   2 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |   9 +-
 .../hudi/commmon/model/HoodieSparkRecord.java      | 142 +++++++++++++--------
 .../hudi/execution/SparkLazyInsertIterable.java    |   2 +-
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |  17 +--
 .../hudi/io/storage/HoodieSparkFileReader.java     |  36 +-----
 .../hudi/io/storage/HoodieSparkParquetReader.java  |  35 +++--
 .../bootstrap/ParquetBootstrapMetadataHandler.java |   3 +-
 .../org/apache/hudi/HoodieInternalRowUtils.scala   |   9 --
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   5 +
 .../TestBoundedInMemoryExecutorInSpark.java        |   8 +-
 .../hudi/execution/TestBoundedInMemoryQueue.java   |  16 +--
 .../hudi/common/model/HoodieAvroIndexedRecord.java |   3 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |   3 +-
 .../hudi/common/model/HoodieAvroRecordMerger.java  |  60 ++++++---
 .../hudi/common/model/HoodieEmptyRecord.java       |   3 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |   2 +-
 .../hudi/common/model/HoodieRecordMerger.java      |   7 +-
 .../apache/hudi/common/model/MetadataValues.java   |  28 ++--
 .../table/log/AbstractHoodieLogRecordReader.java   |  16 +--
 .../table/log/HoodieMergedLogRecordScanner.java    |   3 +-
 .../common/table/log/block/HoodieDataBlock.java    |   2 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  16 +--
 .../hudi/common/util/ParquetReaderIterator.java    |   4 +-
 .../apache/hudi/common/util/VisibleForTesting.java |  25 ++++
 .../hudi/io/storage/HoodieAvroFileReader.java      |  71 +----------
 .../hudi/io/storage/HoodieAvroFileReaderBase.java  |  48 +++++++
 .../hudi/io/storage/HoodieAvroHFileReader.java     |  57 ++++++---
 .../hudi/io/storage/HoodieAvroOrcReader.java       |  26 ++--
 .../hudi/io/storage/HoodieAvroParquetReader.java   |  22 +++-
 .../apache/hudi/io/storage/HoodieFileReader.java   |  46 +++----
 ...ileReader.java => HoodieSeekingFileReader.java} |  23 +---
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  61 ++++-----
 .../hudi/sink/clustering/ClusteringOperator.java   |   8 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |   3 +-
 .../hudi/hadoop/HoodieHFileRecordReader.java       |   7 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   3 +-
 .../org/apache/hudi/HoodieSparkRecordMerger.java   |   9 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  40 +++---
 .../scala/org/apache/hudi/LogFileIterator.scala    |  42 +++---
 .../SparkFullBootstrapDataProviderBase.java        |   9 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   6 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   7 +
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |   3 +
 .../spark/sql/vectorized/ColumnarUtils.scala       |  32 +++++
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |   3 +
 .../spark/sql/vectorized/ColumnarUtils.scala       |  32 +++++
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   3 +
 62 files changed, 635 insertions(+), 547 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index b0831f0bc9..20f75f63c5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -21,9 +21,9 @@ package org.apache.hudi.execution;
 import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.WriteHandleFactory;
@@ -90,18 +90,24 @@ public abstract class HoodieLazyInsertIterable<T>
     }
   }
 
-  /**
-   * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
-   * expensive operations of transformation to the reader thread.
-   */
-  static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
-      Schema schema, HoodieWriteConfig config) {
-    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, config.getProps());
+  static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema,
+                                                                                                       HoodieWriteConfig config) {
+    return getCloningTransformerInternal(schema, config.getProps());
   }
 
-  static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformFunction(
-      Schema schema) {
-    return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema, CollectionUtils.emptyProps());
+  static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) {
+    return getCloningTransformerInternal(schema, new TypedProperties());
+  }
+
+  private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema,
+                                                                                                                       TypedProperties props) {
+    return record -> {
+      // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+      //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+      //       it since these records will be subsequently buffered (w/in the in-memory queue)
+      HoodieRecord<T> clonedRecord = record.copy();
+      return new HoodieInsertValueGenResult(clonedRecord, schema, props);
+    };
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index d3782e9f20..6f6d96394f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -143,8 +143,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
         } else {
           rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
         }
-        MetadataValues metadataValues = new MetadataValues();
-        metadataValues.setFileName(path.getName());
+        MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
         rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
         if (preserveMetadata) {
           fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 28377918b4..276b318890 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -388,12 +388,11 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     } else {
       rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
     }
-    MetadataValues metadataValues = new MetadataValues();
-    metadataValues.setFileName(newFilePath.getName());
+    // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
+    //       file holding this record even in cases when overall metadata is preserved
+    MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
     rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
     if (shouldPreserveRecordMetadata) {
-      // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
-      //       file holding this record even in cases when overall metadata is preserved
       fileWriter.write(key.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
     } else {
       fileWriter.writeWithMetadata(key, rewriteRecord, writeSchemaWithMetaFields);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index fda1435345..e39a60d390 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -33,10 +36,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.generic.GenericRecord;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -80,25 +79,16 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combinedRecordOpt, Schema writerSchema)
       throws IOException {
     // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
-    Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
-    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
-    final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
+    Option<HoodieRecord> savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance);
+    final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
-      Option<IndexedRecord> avroOpt = savedCombineRecordOp
-          .flatMap(r -> {
-            try {
-              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
-                  .map(HoodieAvroIndexedRecord::getData);
-            } catch (IOException e) {
-              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
-              return Option.empty();
-            }
-          });
-      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+      Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+          toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+      cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
     }
     return result;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 2066e9c241..3e0691cdf0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -279,4 +281,13 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
           + "file suffix: " + fileSuffix + " error");
     }
   }
+
+  protected static Option<IndexedRecord> toAvroRecord(HoodieRecord record, Schema writerSchema, TypedProperties props) {
+    try {
+      return record.toIndexedRecord(writerSchema, props).map(HoodieAvroIndexedRecord::getData);
+    } catch (IOException e) {
+      LOG.error("Fail to get indexRecord from " + record, e);
+      return Option.empty();
+    }
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 4c5db03d35..c1bf6de1fd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -33,8 +32,6 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -77,16 +74,7 @@ public abstract class BaseMergeHelper<T, I, K, O> {
     return new MergingIterator<>(
         (Iterator<HoodieRecord>) reader.getRecordIterator(readerSchema),
         (Iterator<HoodieRecord>) bootstrapReader.getRecordIterator(bootstrapReadSchema),
-        (oneRecord, otherRecord) -> mergeRecords(oneRecord, otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
-  }
-
-  @Nonnull
-  private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) {
-    try {
-      return left.joinWith(right, targetSchema);
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to merge records", e);
-    }
+        (oneRecord, otherRecord) -> oneRecord.joinWith(otherRecord, mergeHandle.getWriterSchemaWithMetaFields()));
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 622ed4573e..adef1c4459 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -82,17 +83,16 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
    * @param parallelism parallelism or partitions to be used while reducing/deduplicating
    * @return Collection of HoodieRecord already be deduplicated
    */
-  public I deduplicateRecords(
-      I records, HoodieTable<T, I, K, O> table, int parallelism) {
+  public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parallelism) {
     HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
     return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
   }
 
-  public I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
-    return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger);
+  public I deduplicateRecords(I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
+    TypedProperties updatedProps = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
+    return doDeduplicateRecords(records, index, parallelism, schema, updatedProps, merger);
   }
 
-  protected abstract I deduplicateRecordsInternal(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger);
+  protected abstract I doDeduplicateRecords(
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 1893132e7d..6557f83b24 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -31,7 +32,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
-import java.util.Properties;
 
 public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
@@ -53,8 +53,8 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
   }
 
   @Override
-  public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+  protected HoodieData<HoodieRecord<T>> doDeduplicateRecords(
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
     // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -64,7 +64,10 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their partitionPath
       Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
-      return Pair.of(key, record);
+      // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+      //       Here we have to make a copy of the incoming record, since it might be holding
+      //       an instance of [[InternalRow]] pointing into shared, mutable buffer
+      return Pair.of(key, record.copy());
     }).reduceByKey((rec1, rec2) -> {
       HoodieRecord<T> reducedRecord;
       try {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index 5fb7065a8a..815b04c9db 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -228,14 +228,14 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
         IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
             .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
     Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
-    Iterator<IndexedRecord> iterator = hfileReader.getIndexedRecordsByKeysIterator(keys, avroSchema);
+    Iterator<HoodieRecord<IndexedRecord>> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);
 
     List<Integer> expectedIds =
         IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
             .boxed().collect(Collectors.toList());
     int index = 0;
     while (iterator.hasNext()) {
-      GenericRecord record = (GenericRecord) iterator.next();
+      GenericRecord record = (GenericRecord) iterator.next().getData();
       String key = "key" + String.format("%02d", expectedIds.get(index));
       assertEquals(key, record.get("_row_key").toString());
       assertEquals(Integer.toString(expectedIds.get(index)), record.get("time").toString());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 5ac00bd805..120fdb7deb 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -61,7 +61,7 @@ public class FlinkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
     try {
       final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr),
-          Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig));
+          Option.of(getExplicitInsertHandler()), getCloningTransformer(schema, hoodieConfig));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
       return result;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 7dde24ec8c..39f55f027f 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -70,21 +70,12 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
       throws IOException {
     // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
     Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
-    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
     final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
-      Option<IndexedRecord> avroOpt = savedCombineRecordOp
-          .flatMap(r -> {
-            try {
-              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
-                  .map(HoodieAvroIndexedRecord::getData);
-            } catch (IOException e) {
-              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
-              return Option.empty();
-            }
-          });
-      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+      Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+          toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+      cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
     }
     return result;
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 4c9a7484c1..ce419f16e2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -68,21 +68,12 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
       throws IOException {
     // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
     Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
-    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
     final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
-      Option<IndexedRecord> avroOpt = savedCombineRecordOp
-          .flatMap(r -> {
-            try {
-              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
-                  .map(HoodieAvroIndexedRecord::getData);
-            } catch (IOException e) {
-              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
-              return Option.empty();
-            }
-          });
-      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
+      Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
+          toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
+      cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
     }
     return result;
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index db9422336d..8855457684 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -32,15 +34,12 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.avro.Schema;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.stream.Collectors;
 
 /**
@@ -91,8 +90,8 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
   }
 
   @Override
-  public List<HoodieRecord<T>> deduplicateRecordsInternal(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+  protected List<HoodieRecord<T>> doDeduplicateRecords(
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
     // If index used is global, then records are expected to differ in their partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
         .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index ddf6345926..1e430cd699 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -64,7 +64,7 @@ public class JavaLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
     try {
       final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor =
-          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
+          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getCloningTransformer(schema));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
       return result;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 14016cb5c0..dc109f8103 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -29,13 +31,10 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.Schema;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.stream.Collectors;
 
 public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
@@ -58,8 +57,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
   }
 
   @Override
-  public List<HoodieRecord<T>> deduplicateRecordsInternal(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
+  protected List<HoodieRecord<T>> doDeduplicateRecords(
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
       HoodieKey hoodieKey = record.getKey();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index e111679842..9cdccbe407 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.commmon.model;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -28,18 +30,19 @@ import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
 import org.apache.hudi.util.HoodieSparkRecordUtils;
-
-import org.apache.avro.Schema;
 import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
 import org.apache.spark.sql.catalyst.CatalystTypeConverters;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -55,7 +58,19 @@ import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
- * Spark Engine-specific Implementations of `HoodieRecord`.
+ * Spark Engine-specific Implementations of `HoodieRecord`
+ *
+ * NOTE: [[HoodieSparkRecord]] is expected to hold either [[UnsafeRow]] or [[HoodieInternalRow]]:
+ *
+ * <ul>
+ *    <li>[[UnsafeRow]] is held to make sure a) we don't deserialize raw bytes payload
+ *       into JVM types unnecessarily, b) we don't incur penalty of ser/de during shuffling,
+ *       c) we don't add strain on GC</li>
+ *    <li>[[HoodieInternalRow]] is held in cases when underlying [[UnsafeRow]]'s metadata fields
+ *       need to be updated (ie serving as an overlay layer on top of [[UnsafeRow]])</li>
+ * </ul>
+ *
+
  */
 public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
 
@@ -68,48 +83,38 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
    * We should use this construction method when we read internalRow from file.
    * The record constructed by this method must be used in iter.
    */
-  public HoodieSparkRecord(InternalRow data, StructType schema) {
+  public HoodieSparkRecord(InternalRow data) {
     super(null, data);
-    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, false);
+    validateRow(data);
     this.copy = false;
   }
 
-  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) {
     super(key, data);
-    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
-    this.copy = true;
+    validateRow(data);
+    this.copy = copy;
   }
 
-  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) {
+  private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, boolean copy) {
     super(key, data, operation);
-    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
-    this.copy = true;
-  }
+    validateRow(data);
 
-  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
-    super(key, data, operation);
-    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
     this.copy = copy;
   }
 
-  public HoodieSparkRecord(HoodieSparkRecord record) {
-    super(record);
-    this.copy = record.copy;
-  }
-
   @Override
   public HoodieSparkRecord newInstance() {
-    return new HoodieSparkRecord(this);
+    return new HoodieSparkRecord(this.key, this.data, this.operation, this.copy);
   }
 
   @Override
   public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
-    return new HoodieSparkRecord(key, data, null, op);
+    return new HoodieSparkRecord(key, this.data, op, this.copy);
   }
 
   @Override
   public HoodieSparkRecord newInstance(HoodieKey key) {
-    return new HoodieSparkRecord(key, data, null);
+    return new HoodieSparkRecord(key, this.data, this.operation, this.copy);
   }
 
   @Override
@@ -145,50 +150,56 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
   }
 
   @Override
-  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
     InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
-    return new HoodieSparkRecord(getKey(), mergeRow, targetStructType, getOperation(), copy);
+    UnsafeProjection projection =
+        HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType);
+    return new HoodieSparkRecord(getKey(), projection.apply(mergeRow), getOperation(), copy);
   }
 
   @Override
   public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
-    UTF8String[] metaFields = extractMetaField(structType, targetStructType);
-    if (metaFields.length == 0) {
-      throw new UnsupportedOperationException();
-    }
 
-    boolean containMetaFields = hasMetaField(structType);
-    InternalRow resultRow = new HoodieInternalRow(metaFields, data, containMetaFields);
-    return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation(), copy);
+    boolean containMetaFields = hasMetaFields(structType);
+    UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+
+    // TODO add actual rewriting
+    InternalRow finalRow = new HoodieInternalRow(metaFields, data, containMetaFields);
+
+    return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
   }
 
   @Override
   public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
-    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
-    UTF8String[] metaFields = extractMetaField(structType, newStructType);
-    if (metaFields.length > 0) {
-      rewriteRow = new HoodieInternalRow(metaFields, data, true);
-    }
 
-    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation(), copy);
+    boolean containMetaFields = hasMetaFields(structType);
+    UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+
+    InternalRow rewrittenRow =
+        HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    HoodieInternalRow finalRow = new HoodieInternalRow(metaFields, rewrittenRow, containMetaFields);
+
+    return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
   }
 
   @Override
   public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(data, structType);
+
     metadataValues.getKv().forEach((key, value) -> {
       int pos = structType.fieldIndex(key);
       if (value != null) {
-        data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
+        updatableRow.update(pos, CatalystTypeConverters.convertToCatalyst(value));
       }
     });
 
-    return new HoodieSparkRecord(getKey(), data, structType, getOperation(), copy);
+    return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy);
   }
 
   @Override
@@ -242,7 +253,9 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     String key;
     String partition;
-    if (keyGen.isPresent() && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
+    boolean populateMetaFields = Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(),
+        POPULATE_META_FIELDS.defaultValue().toString()).toString());
+    if (!populateMetaFields && keyGen.isPresent()) {
       SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface) keyGen.get();
       key = keyGenerator.getRecordKey(data, structType).toString();
       partition = keyGenerator.getPartitionPath(data, structType).toString();
@@ -251,7 +264,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
       partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
     }
     HoodieKey hoodieKey = new HoodieKey(key, partition);
-    return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
+    return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy);
   }
 
   @Override
@@ -268,7 +281,7 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
   public HoodieSparkRecord copy() {
     if (!copy) {
       this.data = this.data.copy();
-      copy = true;
+      this.copy = true;
     }
     return this;
   }
@@ -286,20 +299,29 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
     }
   }
 
-  private UTF8String[] extractMetaField(StructType recordStructType, StructType structTypeWithMetaField) {
-    return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
-        .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structTypeWithMetaField, f))
-        .map(field -> {
-          if (HoodieCatalystExpressionUtils$.MODULE$.existField(recordStructType, field)) {
-            return data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field));
-          } else {
-            return UTF8String.EMPTY_UTF8;
-          }
-        }).toArray(UTF8String[]::new);
+  private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data, StructType structType) {
+    if (data instanceof HoodieInternalRow) {
+      return (HoodieInternalRow) data;
+    }
+
+    boolean containsMetaFields = hasMetaFields(structType);
+    UTF8String[] metaFields = tryExtractMetaFields(data, structType);
+    return new HoodieInternalRow(metaFields, data, containsMetaFields);
   }
 
-  private static boolean hasMetaField(StructType structType) {
-    return HoodieCatalystExpressionUtils$.MODULE$.existField(structType, COMMIT_TIME_METADATA_FIELD);
+  private static UTF8String[] tryExtractMetaFields(InternalRow row, StructType structType) {
+    boolean containsMetaFields = hasMetaFields(structType);
+    if (containsMetaFields) {
+      return HoodieRecord.HOODIE_META_COLUMNS.stream()
+          .map(col -> row.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(col)))
+          .toArray(UTF8String[]::new);
+    } else {
+      return new UTF8String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+    }
+  }
+
+  private static boolean hasMetaFields(StructType structType) {
+    return structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined();
   }
 
   /**
@@ -329,6 +351,14 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
 
     HoodieOperation operation = withOperationField
         ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
-    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
+    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, operation, record.copy);
+  }
+
+  private static void validateRow(InternalRow data) {
+    // NOTE: [[HoodieSparkRecord]] is expected to hold either
+    //          - Instance of [[UnsafeRow]] or
+    //          - Instance of [[HoodieInternalRow]] or
+    //          - Instance of [[ColumnarBatchRow]]
+    ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof HoodieInternalRow || SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index f1ae56d70b..b6bb893e02 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -85,7 +85,7 @@ public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
       }
       bufferedIteratorExecutor =
           new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
-              getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
+              getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
       return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 0a042d3362..8673d2f5ba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieInternalRowUtils;
 import org.apache.hudi.HoodieSparkUtils;
@@ -33,13 +35,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.RewriteAvroPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Properties;
@@ -80,7 +78,6 @@ public class RDDSpatialCurveSortPartitioner<T>
               schema.toString(),
               sparkEngineContext.getSqlContext().sparkSession()
           );
-
       Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
 
       return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty())
@@ -94,19 +91,17 @@ public class RDDSpatialCurveSortPartitioner<T>
           });
     } else if (recordType == HoodieRecordType.SPARK) {
       StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get());
-      Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType);
-
+      Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(),
+          sparkEngineContext.getSqlContext().sparkSession(), structType);
       Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
 
       return sortedDataset.queryExecution().toRdd()
           .toJavaRDD()
-          .map(row -> {
-            InternalRow internalRow = row.copy();
+          .map(internalRow -> {
             String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
             String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
             HoodieKey hoodieKey = new HoodieKey(key, partition);
-            HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structType);
-            return hoodieRecord;
+            return (HoodieRecord) new HoodieSparkRecord(hoodieKey, internalRow, false);
           });
     } else {
       throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
index 86134eda37..553b084e29 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReader.java
@@ -18,36 +18,10 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.HoodieInternalRowUtils;
-import org.apache.hudi.commmon.model.HoodieSparkRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.MappingIterator;
-
-import org.apache.avro.Schema;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public interface HoodieSparkFileReader extends HoodieFileReader<InternalRow> {
 
-  ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema) throws IOException;
-
-  ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
-  default ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema) throws IOException {
-    ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema);
-    StructType structType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType)));
-  }
-
-  @Override
-  default ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
-    ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema, requestedSchema);
-    StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data, structType)));
-  }
-}
+/**
+ * Marker interface for every {@link HoodieFileReader} reading in Catalyst (Spark native tyeps, ie
+ * producing {@link InternalRow}s)
+ */
+public interface HoodieSparkFileReader extends HoodieFileReader<InternalRow> {}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index a46174cbae..b162cbbadc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -22,16 +22,19 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
@@ -41,6 +44,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
 public class HoodieSparkParquetReader implements HoodieSparkFileReader {
 
   private final Path path;
@@ -70,12 +75,20 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
   }
 
   @Override
-  public ClosableIterator<InternalRow> getInternalRowIterator(Schema schema) throws IOException {
-    return getInternalRowIterator(schema, null);
+  public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+    ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema, requestedSchema);
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
+    UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType);
+
+    return new MappingIterator<>(iterator, data -> {
+      // NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert
+      //       it to [[UnsafeRow]] holding just raw bytes
+      UnsafeRow unsafeRow = projection.apply(data);
+      return unsafeCast(new HoodieSparkRecord(unsafeRow));
+    });
   }
 
-  @Override
-  public ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+  private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
     if (requestedSchema == null) {
       requestedSchema = readerSchema;
     }
@@ -85,13 +98,9 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
     conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json());
     conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
-    InputFile inputFile = HadoopInputFile.fromPath(path, conf);
-    ParquetReader<InternalRow> reader = new ParquetReader.Builder<InternalRow>(inputFile) {
-      @Override
-      protected ReadSupport getReadSupport() {
-        return new ParquetReadSupport();
-      }
-    }.withConf(conf).build();
+    ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), path)
+        .withConf(conf)
+        .build();
     ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 3828f63564..86a984dd83 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -74,8 +74,7 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
           HoodieRecord recordCopy = record.copy();
           String recKey = recordCopy.getRecordKey(reader.getSchema(), Option.of(keyGenerator));
           HoodieRecord hoodieRecord = recordCopy.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
-          MetadataValues metadataValues = new MetadataValues();
-          metadataValues.setRecordKey(recKey);
+          MetadataValues metadataValues = new MetadataValues().setRecordKey(recKey);
           return hoodieRecord
               .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues)
               .newInstance(new HoodieKey(recKey, partitionPath));
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
index 169ccd61c3..b6c331cbeb 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
@@ -219,15 +219,6 @@ object HoodieInternalRowUtils {
     schemaMap.get(schema)
   }
 
-  def projectUnsafe(row: InternalRow, structType: StructType, copy: Boolean = true): InternalRow = {
-    if (row == null || row.isInstanceOf[UnsafeRow] || row.isInstanceOf[HoodieInternalRow]) {
-      row
-    } else {
-      val unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType).apply(row)
-      if (copy) unsafeRow.copy() else unsafeRow
-    }
-  }
-
   private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
     if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
       oldSchema match {
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 6f9616b669..4b692222ed 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -46,6 +46,11 @@ import java.util.Locale
  */
 trait SparkAdapter extends Serializable {
 
+  /**
+   * Checks whether provided instance of [[InternalRow]] is actually an instance of [[ColumnarBatchRow]]
+   */
+  def isColumnarBatchRow(r: InternalRow): Boolean
+
   /**
    * Returns an instance of [[HoodieCatalogUtils]] providing for common utils operating on Spark's
    * [[TableCatalog]]s
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index a714d60d00..2b19904407 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -41,7 +41,7 @@ import java.util.List;
 
 import scala.Tuple2;
 
-import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -98,7 +98,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
     try {
       executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
-          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
+          getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
       int result = executor.execute();
       // It should buffer and write 100 records
       assertEquals(100, result);
@@ -145,7 +145,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
     try {
       executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
-          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
+          getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
       BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
 
       Thread.currentThread().interrupt();
@@ -194,7 +194,7 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor =
         new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
-            consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA),
             getPreExecuteRunnable());
     executor.shutdownNow();
     boolean terminatedGracefully = executor.awaitTermination();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index a4f723ff01..b4fe688256 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -54,7 +54,7 @@ import java.util.stream.IntStream;
 
 import scala.Tuple2;
 
-import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -85,7 +85,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     final int numRecords = 128;
     final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, numRecords);
     final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
       new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -125,7 +125,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
     final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Record Key to <Producer Index, Rec Index within a producer>
     Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
@@ -220,11 +220,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     final int recordLimit = 5;
     final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> sizeEstimator = new DefaultSizeEstimator<>();
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
+        getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(genResult);
     final long memoryLimitInBytes = recordLimit * objSize;
     final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     executorService.submit(() -> {
@@ -269,7 +269,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>();
     // queue memory limit
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
+        getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
     final long memoryLimitInBytes = 4 * objSize;
 
@@ -277,7 +277,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     // stops and throws
     // correct exception back.
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue1 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
@@ -305,7 +305,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
     when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue2 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+        new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA));
 
     // Produce
     Future<Boolean> res = executorService.submit(() -> {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 913bbcb97c..479c8eb9d6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -98,8 +98,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public HoodieRecord joinWith(HoodieRecord other,
-      Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
     GenericRecord record = HoodieAvroUtils.stitchRecords((GenericRecord) data, (GenericRecord) other.getData(), targetSchema);
     return new HoodieAvroIndexedRecord(record);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index dfd2b4ba33..5cbadece6b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -101,8 +101,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public HoodieRecord joinWith(HoodieRecord other,
-      Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index e57a18b592..b9e29787f8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.model;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -34,28 +36,33 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieAvroRecordMerger implements HoodieRecordMerger {
 
-  public static String DE_DUPING = "de_duping";
-
   @Override
   public String getMergingStrategy() {
     return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
     ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
     ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
-    boolean deDuping = Boolean.parseBoolean(props.getOrDefault(DE_DUPING, "false").toString());
-    if (deDuping) {
-      HoodieRecord res = preCombine(older, newer);
-      if (res == older) {
-        return Option.of(Pair.of(res, oldSchema));
-      } else {
-        return Option.of(Pair.of(res, newSchema));
-      }
-    } else {
-      return combineAndGetUpdateValue(older, newer, newSchema, props)
-          .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
+    Config.LegacyOperationMode legacyOperatingMode = Config.LegacyOperationMode.valueOf(
+            props.getString(Config.LEGACY_OPERATING_MODE.key(), Config.LEGACY_OPERATING_MODE.defaultValue()));
+
+    switch (legacyOperatingMode) {
+      case PRE_COMBINING:
+        HoodieRecord res = preCombine(older, newer);
+        if (res == older) {
+          return Option.of(Pair.of(res, oldSchema));
+        } else {
+          return Option.of(Pair.of(res, newSchema));
+        }
+
+      case COMBINING:
+        return combineAndGetUpdateValue(older, newer, newSchema, props)
+            .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
+
+      default:
+        throw new UnsupportedOperationException(String.format("Unsupported legacy operating mode (%s)", legacyOperatingMode));
     }
   }
 
@@ -83,10 +90,25 @@ public class HoodieAvroRecordMerger implements HoodieRecordMerger {
         .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
   }
 
-  public static Properties withDeDuping(Properties props) {
-    Properties newProps = new Properties();
-    newProps.putAll(props);
-    newProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
-    return newProps;
+  public static class Config {
+
+    public enum LegacyOperationMode {
+      PRE_COMBINING,
+      COMBINING
+    }
+
+    public static ConfigProperty<String> LEGACY_OPERATING_MODE =
+        ConfigProperty.key("hoodie.datasource.write.merger.legacy.operation")
+            .defaultValue(LegacyOperationMode.COMBINING.name())
+            .withDocumentation("Controls the mode of the merging operation performed by `HoodieAvroRecordMerger`. "
+                + "This is required to maintain backward-compatibility w/ the existing semantic of `HoodieRecordPayload` "
+                + "implementations providing `preCombine` and `combineAndGetUpdateValue` methods.");
+
+    public static TypedProperties withLegacyOperatingModePreCombining(Properties props) {
+      TypedProperties newProps = new TypedProperties();
+      newProps.putAll(props);
+      newProps.setProperty(Config.LEGACY_OPERATING_MODE.key(), Config.LegacyOperationMode.PRE_COMBINING.name());
+      return newProps;
+    }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index 74954a5e63..eca1ad9a3b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -92,8 +92,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public HoodieRecord joinWith(HoodieRecord other,
-      Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 1acefe2204..363092409d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -291,7 +291,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
   /**
    * Support bootstrap.
    */
-  public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
+  public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema);
 
   /**
    * Rewrite record into new schema(add meta columns)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 4077752831..da413592ab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -19,16 +19,15 @@
 package org.apache.hudi.common.model;
 
 import org.apache.avro.Schema;
-
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Properties;
 
 /**
  * HoodieMerge defines how to merge two records. It is a stateless component.
@@ -45,7 +44,7 @@ public interface HoodieRecordMerger extends Serializable {
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
    * of the single record, both orders of operations applications have to yield the same result)
    */
-  Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException;
+  Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException;
 
   /**
    * The record type handled by the current merger.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
index a4ff9a1896..baee081633 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
@@ -18,46 +18,44 @@
 
 package org.apache.hudi.common.model;
 
-import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
-
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.hudi.common.util.ValidationUtils;
 
 public class MetadataValues {
-  private Map<String, String> kv;
-
-  public MetadataValues(Map<String, String> kv) {
-    ValidationUtils.checkArgument(HOODIE_META_COLUMNS_WITH_OPERATION.containsAll(kv.values()));
-    this.kv = kv;
-  }
+  private final Map<String, String> kv;
 
   public MetadataValues() {
     this.kv = new HashMap<>();
   }
 
-  public void setCommitTime(String value) {
+  public MetadataValues setCommitTime(String value) {
     this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value);
+    return this;
   }
 
-  public void setCommitSeqno(String value) {
+  public MetadataValues setCommitSeqno(String value) {
     this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value);
+    return this;
   }
 
-  public void setRecordKey(String value) {
+  public MetadataValues setRecordKey(String value) {
     this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value);
+    return this;
   }
 
-  public void setPartitionPath(String value) {
+  public MetadataValues setPartitionPath(String value) {
     this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value);
+    return this;
   }
 
-  public void setFileName(String value) {
+  public MetadataValues setFileName(String value) {
     this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value);
+    return this;
   }
 
-  public void setOperation(String value) {
+  public MetadataValues setOperation(String value) {
     this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value);
+    return this;
   }
 
   public Map<String, String> getKv() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 8a4901019c..862cea8f79 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -96,7 +97,7 @@ public abstract class AbstractHoodieLogRecordReader {
   protected final String preCombineField;
   // Stateless component for merging records
   protected final HoodieRecordMerger recordMerger;
-  private final Properties payloadProps = new Properties();
+  private final TypedProperties payloadProps;
   // simple key gen fields
   private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
   // Log File Paths
@@ -164,10 +165,11 @@ public abstract class AbstractHoodieLogRecordReader {
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
     // Log scanner merge log with precombine
-    this.payloadProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
+    TypedProperties props = HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(new Properties());
     if (this.preCombineField != null) {
-      this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
+      props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
     }
+    this.payloadProps = props;
     this.recordMerger = recordMerger;
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
@@ -475,14 +477,12 @@ public abstract class AbstractHoodieLogRecordReader {
   }
 
   private ClosableIterator<HoodieRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt, HoodieRecordType type) throws IOException {
-    ClosableIterator<HoodieRecord> iter;
     if (keySpecOpt.isPresent()) {
       KeySpec keySpec = keySpecOpt.get();
-      iter =  unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type));
+      return unsafeCast(dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey, type));
     } else {
-      iter = unsafeCast(dataBlock.getRecordIterator(type));
+      return unsafeCast(dataBlock.getRecordIterator(type));
     }
-    return iter;
   }
 
   /**
@@ -524,7 +524,7 @@ public abstract class AbstractHoodieLogRecordReader {
     return withOperationField;
   }
 
-  protected Properties getPayloadProps() {
+  protected TypedProperties getPayloadProps() {
     return payloadProps;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 42294c3d11..7d2e2a654a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -155,7 +155,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
 
       HoodieRecord<T> oldRecord = records.get(key);
       T oldValue = oldRecord.getData();
-      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
+      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema,
+          newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedRecord.getData() != oldValue) {
         records.put(key, combinedRecord.copy());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 19ecdbf9ee..2e499f4ae9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -179,7 +179,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
     }
 
     HashSet<String> keySet = new HashSet<>(keys);
-    return FilteringIterator.getInstance(allRecords, keySet, fullKey, record -> getRecordKey(record));
+    return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
   }
 
   protected <T> ClosableIterator<HoodieRecord<T>> readRecordsFromBlockPayload(HoodieRecordType type) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index faa7856e43..8f23dce02a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -34,19 +33,15 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.inline.InLineFSUtils;
 import org.apache.hudi.common.fs.inline.InLineFileSystem;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 import org.apache.hudi.io.storage.HoodieAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -58,10 +53,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.TreeMap;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile
@@ -205,10 +201,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
              new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
 
     // Get writer's schema from the header
-    final ClosableIterator<IndexedRecord> recordIterator =
-        fullKey ? reader.getIndexedRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getIndexedRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
+    final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
+        fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
 
-    return new MappingIterator<>(recordIterator, data -> (HoodieRecord<T>) new HoodieAvroIndexedRecord((data)));
+    return new MappingIterator<>(recordIterator, data -> (HoodieRecord<T>) data);
   }
 
   private byte[] serializeRecord(HoodieRecord<?> record, Schema schema) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 42ea766eb9..f681bdfe84 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -32,11 +32,11 @@ import java.io.IOException;
 public class ParquetReaderIterator<T> implements ClosableIterator<T> {
 
   // Parquet reader for an existing parquet file
-  private final ParquetReader<T> parquetReader;
+  private final ParquetReader<? extends T> parquetReader;
   // Holds the next entry returned by the parquet reader
   private T next;
 
-  public ParquetReaderIterator(ParquetReader<T> parquetReader) {
+  public ParquetReaderIterator(ParquetReader<? extends T> parquetReader) {
     this.parquetReader = parquetReader;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java
new file mode 100644
index 0000000000..c6e4e488f0
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/VisibleForTesting.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+/**
+ * Annotation designating a field or a method as visible for the testing purposes
+ */
+public @interface VisibleForTesting {
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index c2ddfa319f..a829880d5f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -18,71 +18,10 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.MappingIterator;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord>, AutoCloseable {
-
-  ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException;
-
-  ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
-  default Option<IndexedRecord> getIndexedRecordByKey(String key, Schema readerSchema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  default ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  default ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys) throws IOException {
-    return getIndexedRecordsByKeysIterator(keys, getSchema());
-  }
-
-  default ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
-    throw new UnsupportedEncodingException();
-  }
-
-  default ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
-    return getIndexedRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
-  }
-
-  default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
-    ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeysIterator(keys, schema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
-  }
-
-  default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
-    ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
-  }
-
-  @Override
-  default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema schema) throws IOException {
-    ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(schema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
-  }
-
-  @Override
-  default ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
-    ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema);
-    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
-  }
-
-  default Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
-    return getIndexedRecordByKey(key, readerSchema)
-        .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
-  }
-}
+/**
+ * Marker interface for every {@link HoodieFileReader} reading in Avro (ie
+ * producing {@link IndexedRecord}s)
+ */
+public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> {}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
new file mode 100644
index 0000000000..5cee50449a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+/**
+ * Base class for every {@link HoodieAvroFileReader}
+ */
+abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader {
+
+  @Override
+  public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+    ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema);
+    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+  }
+
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException {
+    return getIndexedRecordIterator(readerSchema, readerSchema);
+  }
+
+  protected abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index 90323d8883..7a70a1f2a3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -37,8 +37,12 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -53,10 +57,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
@@ -64,7 +69,7 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
  * <p>
  * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
  */
-public class HoodieAvroHFileReader implements HoodieAvroFileReader {
+public class HoodieAvroHFileReader extends HoodieAvroFileReaderBase implements HoodieSeekingFileReader<IndexedRecord> {
 
   // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
   public static final String SCHEMA_KEY = "schema";
@@ -114,6 +119,30 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
         .orElseGet(() -> Lazy.lazily(() -> fetchSchema(reader)));
   }
 
+  @Override
+  public Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
+    synchronized (sharedScannerLock) {
+      return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema)
+          .map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+    }
+  }
+
+  @Override
+  public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
+    // We're caching blocks for this scanner to minimize amount of traffic
+    // to the underlying storage as we fetched (potentially) sparsely distributed
+    // keys
+    HFileScanner scanner = getHFileScanner(reader, true);
+    ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, keys, getSchema(), schema);
+    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+  }
+
+  @Override
+  public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
+    ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema);
+    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+  }
+
   @Override
   public String[] readMinMaxRecordKeys() {
     // NOTE: This access to reader is thread-safe
@@ -169,28 +198,19 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
     }
   }
 
-  @SuppressWarnings("unchecked")
   @Override
-  public Option<IndexedRecord> getIndexedRecordByKey(String key, Schema readerSchema) throws IOException {
-    synchronized (sharedScannerLock) {
-      return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema);
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) {
+    if (!Objects.equals(readerSchema, requestedSchema)) {
+      throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
     }
-  }
 
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException {
     // TODO eval whether seeking scanner would be faster than pread
     HFileScanner scanner = getHFileScanner(reader, false);
     return new RecordIterator(scanner, getSchema(), readerSchema);
   }
 
-  @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
+  @VisibleForTesting
+  protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
     // We're caching blocks for this scanner to minimize amount of traffic
     // to the underlying storage as we fetched (potentially) sparsely distributed
     // keys
@@ -198,9 +218,8 @@ public class HoodieAvroHFileReader implements HoodieAvroFileReader {
     return new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
+  @VisibleForTesting
+  protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
     // We're caching blocks for this scanner to minimize amount of traffic
     // to the underlying storage as we fetched (potentially) sparsely distributed
     // keys
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 051add1102..77b86fe1f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -36,11 +36,13 @@ import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Set;
 
-public class HoodieAvroOrcReader implements HoodieAvroFileReader {
-  private Path path;
-  private Configuration conf;
+public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase {
+
+  private final Path path;
+  private final Configuration conf;
   private final BaseFileUtils orcUtils;
 
   public HoodieAvroOrcReader(Configuration configuration, Path path) {
@@ -65,30 +67,28 @@ public class HoodieAvroOrcReader implements HoodieAvroFileReader {
   }
 
   @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+    if (!Objects.equals(readerSchema, requestedSchema)) {
+      throw new UnsupportedOperationException("Schema projections are not supported in HFile reader");
+    }
+
     try {
       Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
-      TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
+      TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readerSchema);
       RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
-      return new OrcReaderIterator<>(recordReader, schema, orcSchema);
+      return new OrcReaderIterator<>(recordReader, readerSchema, orcSchema);
     } catch (IOException io) {
       throw new HoodieIOException("Unable to create an ORC reader.", io);
     }
   }
 
-  @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public Schema getSchema() {
     return orcUtils.readAvroSchema(conf, path);
   }
 
   @Override
-  public void close() {
-  }
+  public void close() {}
 
   @Override
   public long getTotalRecords() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index 769ef391b4..e249da0b8a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -23,9 +23,12 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.parquet.avro.AvroParquetReader;
@@ -37,12 +40,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-public class HoodieAvroParquetReader implements HoodieAvroFileReader {
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
 
   private final Path path;
   private final Configuration conf;
   private final BaseFileUtils parquetUtils;
-  private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
+  private final List<ParquetReaderIterator> readerIterators = new ArrayList<>();
 
   public HoodieAvroParquetReader(Configuration configuration, Path path) {
     this.conf = configuration;
@@ -50,6 +55,15 @@ public class HoodieAvroParquetReader implements HoodieAvroFileReader {
     this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
   }
 
+  @Override
+  public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema) throws IOException {
+    // TODO(HUDI-4588) remove after HUDI-4588 is resolved
+    // NOTE: This is a workaround to avoid leveraging projection w/in [[AvroParquetReader]],
+    //       until schema handling issues (nullability canonicalization, etc) are resolved
+    ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema);
+    return new MappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
+  }
+
   @Override
   public String[] readMinMaxRecordKeys() {
     return parquetUtils.readMinMaxRecordKeys(conf, path);
@@ -66,12 +80,12 @@ public class HoodieAvroParquetReader implements HoodieAvroFileReader {
   }
 
   @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema schema) throws IOException {
     return getIndexedRecordIteratorInternal(schema, Option.empty());
   }
 
   @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
     return getIndexedRecordIteratorInternal(readerSchema, Option.of(requestedSchema));
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 8152a176a0..30dbc5b94e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -22,13 +22,23 @@ import org.apache.avro.Schema;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
 import java.util.Set;
 
+/**
+ * Hudi's File Reader interface providing common set of APIs to fetch
+ *
+ * <ul>
+ *   <li>{@link HoodieRecord}s</li>
+ *   <li>Metadata (statistics, bloom-filters, etc)</li>
+ * </ul>
+ *
+ * from a file persisted in storage.
+ *
+ * @param <T> target engine-specific representation of the raw data ({@code IndexedRecord} for Avro,
+ *           {@code InternalRow} for Spark, etc)
+ */
 public interface HoodieFileReader<T> extends AutoCloseable {
 
   String[] readMinMaxRecordKeys();
@@ -37,36 +47,14 @@ public interface HoodieFileReader<T> extends AutoCloseable {
 
   Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException;
-
   ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
 
-  default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
-    return getRecordIterator(getSchema());
+  default ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException {
+    return getRecordIterator(readerSchema, readerSchema);
   }
 
-  default Option<HoodieRecord<T>> getRecordByKey(String key, Schema readerSchema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  default Option<HoodieRecord<T>> getRecordByKey(String key) throws IOException {
-    return getRecordByKey(key, getSchema());
-  }
-
-  default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> keys) throws IOException {
-    return getRecordsByKeysIterator(keys, getSchema());
-  }
-
-  default ClosableIterator<HoodieRecord<T>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
-    throw new UnsupportedEncodingException();
-  }
-
-  default ClosableIterator<HoodieRecord<T>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
-    return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
+  default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
+    return getRecordIterator(getSchema());
   }
 
   Schema getSchema();
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
similarity index 75%
copy from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
copy to hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
index 8152a176a0..ee4a5f5e7a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.avro.Schema;
-import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
@@ -27,23 +26,8 @@ import org.apache.hudi.common.util.Option;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
-import java.util.Set;
 
-public interface HoodieFileReader<T> extends AutoCloseable {
-
-  String[] readMinMaxRecordKeys();
-
-  BloomFilter readBloomFilter();
-
-  Set<String> filterRowKeys(Set<String> candidateRowKeys);
-
-  ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema) throws IOException;
-
-  ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
-
-  default ClosableIterator<HoodieRecord<T>> getRecordIterator() throws IOException {
-    return getRecordIterator(getSchema());
-  }
+public interface HoodieSeekingFileReader<T> extends HoodieFileReader<T> {
 
   default Option<HoodieRecord<T>> getRecordByKey(String key, Schema readerSchema) throws IOException {
     throw new UnsupportedOperationException();
@@ -69,9 +53,4 @@ public interface HoodieFileReader<T> extends AutoCloseable {
     return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
   }
 
-  Schema getSchema();
-
-  void close();
-
-  long getTotalRecords();
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index f777c55e89..b22ebc1ad7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -50,12 +53,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
-import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -97,7 +96,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
   private final boolean reuse;
 
   // Readers for the latest file slice corresponding to file groups in the metadata partition
-  private Map<Pair<String, String>, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>> partitionReaders =
+  private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader>> partitionReaders =
       new ConcurrentHashMap<>();
 
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
@@ -164,12 +163,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
             (SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
               // NOTE: Since this will be executed by executors, we can't access previously cached
               //       readers, and therefore have to always open new ones
-              Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+              Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
                   openReaders(partitionName, fileSlice);
               try {
                 List<Long> timings = new ArrayList<>();
 
-                HoodieFileReader baseFileReader = readers.getKey();
+                HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
                 HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
 
                 if (baseFileReader == null && logRecordScanner == null) {
@@ -210,11 +209,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
     AtomicInteger fileSlicesKeysCount = new AtomicInteger();
     partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
-      Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+      Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
           getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
       try {
         List<Long> timings = new ArrayList<>();
-        HoodieFileReader baseFileReader = readers.getKey();
+        HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
         HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
         if (baseFileReader == null && logRecordScanner == null) {
           return;
@@ -281,7 +280,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     return logRecords;
   }
 
-  private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
+  private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
                                                                                                              List<String> keys,
                                                                                                              boolean fullKeys,
                                                                                                              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
@@ -290,7 +289,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     HoodieTimer timer = new HoodieTimer().startTimer();
     timer.startTimer();
 
-    if (baseFileReader == null) {
+    if (reader == null) {
       // No base file at all
       timings.add(timer.endTimer());
       if (fullKeys) {
@@ -310,7 +309,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     readTimer.startTimer();
 
     Map<String, HoodieRecord<HoodieMetadataPayload>> records =
-        fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);
+        fetchBaseFileRecordsByKeys(reader, keys, fullKeys, partitionName);
 
     metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
 
@@ -342,12 +341,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     }
   }
 
-  private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader baseFileReader,
+  @SuppressWarnings("unchecked")
+  private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
                                                                                       List<String> keys,
                                                                                       boolean fullKeys,
                                                                                       String partitionName) throws IOException {
-    ClosableIterator<HoodieRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
-        : baseFileReader.getRecordsByKeyPrefixIterator(keys);
+    ClosableIterator<HoodieRecord<?>> records = fullKeys
+        ? reader.getRecordsByKeysIterator(keys)
+        : reader.getRecordsByKeyPrefixIterator(keys);
 
     return toStream(records)
         .map(record -> {
@@ -402,21 +403,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * @param slice            - The file slice to open readers for
    * @return File reader and the record scanner pair for the requested file slice
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
     if (reuse) {
-      return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
-        return openReaders(partitionName, slice); });
+      Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
+      return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
     } else {
       return openReaders(partitionName, slice);
     }
   }
 
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+  private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
     try {
       HoodieTimer timer = new HoodieTimer().startTimer();
       // Open base file reader
-      Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
-      HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+      Pair<HoodieSeekingFileReader<?>, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
+      HoodieSeekingFileReader<?> baseFileReader = baseFileReaderOpenTimePair.getKey();
       final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
 
       // Open the log record scanner using the log files from the latest file slice
@@ -434,18 +435,20 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     }
   }
 
-  private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
-    HoodieFileReader baseFileReader = null;
+  private Pair<HoodieSeekingFileReader<?>, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
+    HoodieSeekingFileReader<?> baseFileReader;
     Long baseFileOpenMs;
     // If the base file is present then create a reader
     Option<HoodieBaseFile> basefile = slice.getBaseFile();
     if (basefile.isPresent()) {
-      String basefilePath = basefile.get().getPath();
-      baseFileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), new Path(basefilePath));
+      String baseFilePath = basefile.get().getPath();
+      baseFileReader = (HoodieSeekingFileReader<?>) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+          .getFileReader(hadoopConf.get(), new Path(baseFilePath));
       baseFileOpenMs = timer.endTimer();
-      LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath,
+      LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath,
           basefile.get().getCommitTime(), baseFileOpenMs));
     } else {
+      baseFileReader = null;
       baseFileOpenMs = 0L;
       timer.endTimer();
     }
@@ -572,7 +575,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
    * @param partitionFileSlicePair - Partition and FileSlice
    */
   private synchronized void close(Pair<String, String> partitionFileSlicePair) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
         partitionReaders.remove(partitionFileSlicePair);
     closeReader(readers);
   }
@@ -587,7 +590,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     partitionReaders.clear();
   }
 
-  private void closeReader(Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers) {
+  private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers) {
     if (readers != null) {
       try {
         if (readers.getKey() != null) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 1b3bc83547..459dff0b45 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -25,9 +25,11 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ConcatenatingIterator;
 import org.apache.hudi.common.model.ClusteringGroupInfo;
 import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieFileSliceReader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -301,8 +303,10 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     List<Iterator<RowData>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
       Iterable<IndexedRecord> indexedRecords = () -> {
         try {
-          return ((HoodieAvroFileReader)HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
-              .getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))).getIndexedRecordIterator(readerSchema);
+          HoodieFileReaderFactory fileReaderFactory = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
+          HoodieAvroFileReader fileReader = (HoodieAvroFileReader) fileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
+
+          return new MappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData);
         } catch (IOException e) {
           throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
               + " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index e9accb0a86..4daf8df523 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -715,7 +716,7 @@ public class MergeOnReadInputFormat
 
     private final Set<String> keyToSkip = new HashSet<>();
 
-    private final Properties payloadProps;
+    private final TypedProperties payloadProps;
 
     private RowData currentRecord;
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 5b7e7fbc67..a3b4a6c166 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieAvroHFileReader;
 
@@ -41,7 +42,7 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array
   private long count = 0;
   private ArrayWritable valueObj;
   private HoodieAvroHFileReader reader;
-  private Iterator<IndexedRecord> recordIterator;
+  private Iterator<HoodieRecord<IndexedRecord>> recordIterator;
   private Schema schema;
 
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException {
@@ -56,14 +57,14 @@ public class HoodieHFileRecordReader implements RecordReader<NullWritable, Array
   @Override
   public boolean next(NullWritable key, ArrayWritable value) throws IOException {
     if (recordIterator == null) {
-      recordIterator = reader.getIndexedRecordIterator(schema);
+      recordIterator = reader.getRecordIterator(schema);
     }
 
     if (!recordIterator.hasNext()) {
       return false;
     }
 
-    IndexedRecord record = recordIterator.next();
+    IndexedRecord record = recordIterator.next().getData();
     ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema);
     value.set(aWritable.get());
     count++;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 0644656ce4..7d8aa9f504 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypeUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -269,7 +270,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
       Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
       HoodieAvroFileReader reader = TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(metaClient.getHadoopConf(),
           new Path(fileSlice.getBaseFile().get().getPath())));
-      return reader.getIndexedRecordIterator(schema);
+      return new MappingIterator<>(reader.getRecordIterator(schema), HoodieRecord::getData);
     } else {
       // If there is no data file, fall back to reading log files
       HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 04daafbab0..ff51e02702 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -18,17 +18,16 @@
 
 package org.apache.hudi;
 
+import org.apache.avro.Schema;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-
-import org.apache.avro.Schema;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
-import java.util.Properties;
 
 public class HoodieSparkRecordMerger implements HoodieRecordMerger {
 
@@ -38,7 +37,7 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
     ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK);
     ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 53380c9aa6..9bbf0a9414 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -752,9 +752,9 @@ object HoodieBaseRelation extends SparkAdapterSupport {
       val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
       val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)
 
-      reader.getIndexedRecordIterator(requiredAvroSchema).asScala
+      reader.getRecordIterator(requiredAvroSchema).asScala
         .map(record => {
-          avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get
+          avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
         })
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1b8cb57936..99313884c3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -355,18 +355,7 @@ object HoodieSparkSqlWriter {
     processedRecord
   }
 
-  def getSparkProcessedRecord(partitionParam: String, record: InternalRow,
-                              dropPartitionColumns: Boolean, schema: StructType): (InternalRow, StructType) = {
-    var processedRecord = record
-    var writeSchema = schema
-    if (dropPartitionColumns) {
-      writeSchema = generateSparkSchemaWithoutPartitionColumns(partitionParam, schema)
-      processedRecord = HoodieInternalRowUtils.rewriteRecord(record, schema, writeSchema)
-    }
-    (processedRecord, writeSchema)
-  }
-
-  def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
+  private def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
     val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
     parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
       HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
@@ -874,16 +863,23 @@ object HoodieSparkSqlWriter {
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        df.queryExecution.toRdd.map(row => {
-          val internalRow = row.copy()
-          val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structType)
-          val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
-          val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
-          val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-
-          new HoodieSparkRecord(key, processedRow, writeSchema)
-        }).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
+        df.queryExecution.toRdd.mapPartitions { iter =>
+          val projection: Function[InternalRow, InternalRow] = if (dropPartitionColumns) {
+            val newSchema = generateSparkSchemaWithoutPartitionColumns(partitionCols, structType)
+            HoodieInternalRowUtils.getCachedUnsafeProjection(structType, newSchema)
+          } else {
+            identity
+          }
+
+          iter.map { internalRow =>
+            val processedRow = projection(internalRow)
+            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
+            val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
+            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
+
+            new HoodieSparkRecord(key, processedRow, false)
+          }
+        }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
-
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index f5a3834304..256bc14e82 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -18,36 +18,36 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection}
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
-import org.apache.hudi.config.HoodiePayloadConfig
-import org.apache.hudi.commmon.model.HoodieSparkRecord
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hudi.LogFileIterator._
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
 import org.apache.hudi.common.util.HoodieRecordUtils
+import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
 import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
 import org.apache.spark.sql.HoodieCatalystExpressionUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.types.StructType
+
 import java.io.Closeable
-import java.util.Properties
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -65,13 +65,14 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
 
   protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))
 
-  protected val payloadProps = tableState.preCombineFieldOpt
+  protected val payloadProps: TypedProperties = tableState.preCombineFieldOpt
     .map { preCombineField =>
       HoodiePayloadConfig.newBuilder
         .withPayloadOrderingField(preCombineField)
         .build
         .getProps
-    }.getOrElse(new Properties())
+    }
+    .getOrElse(new TypedProperties())
 
   protected override val avroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
   protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema
@@ -82,8 +83,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   protected var recordToLoad: InternalRow = _
 
   protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
-
-  protected val requiredSchemaSafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
+  protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
 
   // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
   private var logScanner = {
@@ -122,7 +122,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
           recordToLoad = deserialize(projectedAvroRecord)
           true
         case Some(r: HoodieSparkRecord) =>
-          recordToLoad = requiredSchemaSafeRowProjection(r.getData)
+          recordToLoad = requiredSchemaUnsafeRowProjection(r.getData)
           true
         case None => this.hasNextInternal
       }
@@ -238,7 +238,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
     //       on the record from the Delta Log
     recordMerger.getRecordType match {
       case HoodieRecordType.SPARK =>
-        val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
+        val curRecord = new HoodieSparkRecord(curRow)
         val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
         toScalaOption(result)
           .map(r => {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index 371a2d7b41..dc408ee919 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.bootstrap;
 
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -38,13 +39,10 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
@@ -94,12 +92,11 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
       } else if (recordType == HoodieRecordType.SPARK) {
         SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator;
         StructType structType = inputDataset.schema();
-        return inputDataset.queryExecution().toRdd().toJavaRDD().map(row -> {
-          InternalRow internalRow = row.copy();
+        return inputDataset.queryExecution().toRdd().toJavaRDD().map(internalRow -> {
           String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString();
           String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
           HoodieKey key = new HoodieKey(recordKey, partitionPath);
-          return new HoodieSparkRecord(key, internalRow, structType);
+          return new HoodieSparkRecord(key, internalRow, false);
         });
       } else {
         throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 03dcd2db4d..a77a14047d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -646,7 +646,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
     var recordsReadDF = spark.read.format("org.apache.hudi")
       .options(readOpts)
       .load(basePath + "/*/*")
-    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0)
+
+    assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count())
 
     // Specify fieldType as TIMESTAMP
     writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
@@ -659,7 +660,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .options(readOpts)
       .load(basePath + "/*/*")
     val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd")))
-    assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
+
+    assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count())
 
     // Mixed fieldType
     writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index d4ca8bc402..d537dcef4b 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
@@ -48,6 +49,12 @@ import scala.collection.mutable.ArrayBuffer
  */
 class Spark2Adapter extends SparkAdapter {
 
+  override def isColumnarBatchRow(r: InternalRow): Boolean = {
+    // NOTE: In Spark 2.x there's no [[ColumnarBatchRow]], instead [[MutableColumnarRow]] is leveraged
+    //       for vectorized reads
+    r.isInstanceOf[MutableColumnarRow]
+  }
+
   override def getCatalogUtils: HoodieCatalogUtils = {
     throw new UnsupportedOperationException("Catalog utilities are not supported in Spark 2.x");
   }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index bed6ff3350..0a4bc289b3 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarUtils
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalogUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession}
 
 /**
@@ -37,6 +38,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU
  */
 class Spark3_1Adapter extends BaseSpark3Adapter {
 
+  override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r)
+
   override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark31CatalogUtils
 
   override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
new file mode 100644
index 0000000000..e6015a65cb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.vectorized
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+object ColumnarUtils {
+
+  /**
+   * Utility verifying whether provided instance of [[InternalRow]] is actually
+   * an instance of [[ColumnarBatchRow]]
+   *
+   * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private
+   */
+  def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index 8e15745d57..4c58f6b119 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -29,12 +29,15 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Sp
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
 import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarUtils
 
 /**
  * Implementation of [[SparkAdapter]] for Spark 3.2.x branch
  */
 class Spark3_2Adapter extends BaseSpark3Adapter {
 
+  override def isColumnarBatchRow(r: InternalRow): Boolean = ColumnarUtils.isColumnarBatchRow(r)
+
   override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark32CatalogUtils
 
   override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
new file mode 100644
index 0000000000..e6015a65cb
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/vectorized/ColumnarUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.vectorized
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+object ColumnarUtils {
+
+  /**
+   * Utility verifying whether provided instance of [[InternalRow]] is actually
+   * an instance of [[ColumnarBatchRow]]
+   *
+   * NOTE: This utility is required, since in Spark <= 3.3 [[ColumnarBatchRow]] is package-private
+   */
+  def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
+}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 72bc1d19f4..5f5414eb7e 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32PlusHoodieParquetFileFormat}
 import org.apache.spark.sql.parser.HoodieSpark3_3ExtendedSqlParser
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatchRow
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark33CatalogUtils, HoodieSpark33CatalystExpressionUtils, HoodieSpark33CatalystPlanUtils, HoodieSpark3CatalogUtils, SparkSession}
 
 /**
@@ -35,6 +36,8 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU
  */
 class Spark3_3Adapter extends BaseSpark3Adapter {
 
+  override def isColumnarBatchRow(r: InternalRow): Boolean = r.isInstanceOf[ColumnarBatchRow]
+
   override def getCatalogUtils: HoodieSpark3CatalogUtils = HoodieSpark33CatalogUtils
 
   override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark33CatalystExpressionUtils