You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2022/10/15 01:38:10 UTC

[hudi] branch release-feature-rfc46 updated: Fix comment in RFC46 (#6745)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
     new 19ed31f9be Fix comment in RFC46 (#6745)
19ed31f9be is described below

commit 19ed31f9bec718548fdb2475c7d0a2df3d6539cd
Author: komao <ma...@gmail.com>
AuthorDate: Sat Oct 15 09:38:02 2022 +0800

    Fix comment in RFC46 (#6745)
    
    * rename
    
    * add MetadataValues in updateMetadataValues
    
    * remove singleton in fileFactory
    
    * add truncateRecordKey
    
    * remove hoodieRecord#setData
    
    * rename HoodieAvroRecord
    
    * fix code style
    
    * fix HoodieSparkRecordSerializer
    
    * fix benchmark
    
    * fix SparkRecordUtils
    
    * instantiate HoodieWriteConfig on the fly
    
    * add test
    
    * fix HoodieSparkRecordSerializer. Replace Java's object serialization with kryo
    
    * add broadcast
    
    * fix comment
    
    * remove unnecessary broadcast
    
    * add unsafe check in spark record
    
    * fix getRecordColumnValues
    
    * remove spark.sql.parquet.writeLegacyFormat
    
    * fix unsafe projection
    
    * fix
    
    * pass external schema
    
    * update doc
    
    * rename back to HoodieAvroRecord
    
    * fix
    
    * remove comparable wrapper
    
    * fix comment
    
    * fix comment
    
    * fix comment
    
    * fix comment
    
    * simplify row copy
    
    * fix ParquetReaderIterator
    
    Co-authored-by: Shawy Geng <ge...@gmail.com>
    Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  23 +--
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  26 +--
 .../org/apache/hudi/io/HoodieConcatHandle.java     |   9 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  12 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  73 +++----
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  42 ++--
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |  11 +-
 .../io/HoodieSortedMergeHandleWithChangeLog.java   |   6 +-
 .../strategy/ClusteringExecutionStrategy.java      |   2 +-
 .../hudi/table/action/commit/BaseMergeHelper.java  |   2 +-
 .../hudi/table/action/commit/BaseWriteHelper.java  |  10 +-
 .../table/action/commit/HoodieMergeHelper.java     |  10 +-
 .../table/action/commit/HoodieWriteHelper.java     |   7 +-
 .../hudi/io/FlinkConcatAndReplaceHandle.java       |   7 +-
 .../java/org/apache/hudi/io/FlinkConcatHandle.java |   7 +-
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |  39 +++-
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |  33 +++-
 .../hudi/table/action/commit/FlinkMergeHelper.java |  10 +-
 .../hudi/table/action/commit/FlinkWriteHelper.java |   6 +-
 .../run/strategy/JavaExecutionStrategy.java        |   2 +-
 .../hudi/table/action/commit/JavaMergeHelper.java  |  10 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |   6 +-
 .../MultipleSparkJobExecutionStrategy.java         |  12 +-
 .../strategy/SingleSparkJobExecutionStrategy.java  |  14 +-
 .../hudi/commmon/model/HoodieSparkRecord.java      | 219 +++++++++++++--------
 .../bulkinsert/RDDConsistentBucketPartitioner.java |   8 +-
 .../RDDCustomColumnsSortPartitioner.java           |   2 +-
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |   4 +-
 .../io/storage/HoodieSparkFileReaderFactory.java   |  11 --
 .../io/storage/HoodieSparkFileWriterFactory.java   |   9 -
 .../hudi/io/storage/HoodieSparkParquetReader.java  |   4 +-
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  14 +-
 .../apache/hudi/util/HoodieSparkRecordUtils.java   |  59 +-----
 .../org/apache/hudi/HoodieInternalRowUtils.scala   |  67 ++-----
 .../spark/sql/hudi/SparkStructTypeSerializer.scala | 157 ---------------
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   7 +-
 .../hudi/execution/TestBoundedInMemoryQueue.java   |   4 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   2 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  38 ++--
 .../apache/hudi/common/model/HoodieAvroRecord.java |  50 +++--
 .../hudi/common/model/HoodieAvroRecordMerger.java  |  36 ++--
 .../hudi/common/model/HoodieEmptyRecord.java       |  49 ++---
 .../org/apache/hudi/common/model/HoodieRecord.java |  28 +--
 .../model/HoodieRecordCompatibilityInterface.java  |  11 +-
 .../hudi/common/model/HoodieRecordMerger.java      |   5 +-
 .../apache/hudi/common/model/MetadataValues.java   |  66 +++++++
 .../hudi/common/table/HoodieTableConfig.java       |  11 +-
 .../table/log/AbstractHoodieLogRecordReader.java   |   3 +
 .../table/log/HoodieMergedLogRecordScanner.java    |  20 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   2 +-
 .../common/table/log/block/HoodieDataBlock.java    |   2 +-
 .../table/log/block/HoodieHFileDataBlock.java      |   5 +-
 .../org/apache/hudi/common/util/ConfigUtils.java   |  12 ++
 .../apache/hudi/common/util/HoodieRecordUtils.java |   2 +-
 .../hudi/common/util/ParquetReaderIterator.java    |  21 +-
 .../hudi/common/util/SerializationUtils.java       |  18 --
 .../org/apache/hudi/common/util/StringUtils.java   |   2 -
 .../hudi/common/util/collection/FlatLists.java     | 148 ++++++++++++++
 .../io/storage/HoodieAvroFileReaderFactory.java    |  11 --
 .../hudi/io/storage/HoodieAvroFileWriter.java      |   5 +-
 .../io/storage/HoodieAvroFileWriterFactory.java    |  11 --
 .../hudi/io/storage/HoodieFileReaderFactory.java   |   9 +-
 .../hudi/io/storage/HoodieFileWriterFactory.java   |   9 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   6 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   6 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   5 +-
 .../org/apache/hudi/table/format/FormatUtils.java  |   2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |   7 +-
 .../apache/hudi/hadoop/TestInputPathHandler.java   |   4 +-
 .../org/apache/hudi/HoodieSparkRecordMerger.java   |  14 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  12 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  18 +-
 .../scala/org/apache/hudi/LogFileIterator.scala    |  42 ++--
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |   7 +-
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |  12 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   2 +
 .../SparkFullBootstrapDataProviderBase.java        |   9 +-
 .../apache/hudi/TestHoodieInternalRowUtils.scala   |  12 --
 .../apache/hudi/functional/TestCOWDataSource.scala |   2 +-
 .../apache/hudi/functional/TestMORDataSource.scala |  15 +-
 .../ReadAndWriteWithoutAvroBenchmark.scala         |  75 ++++---
 .../spark/sql/hudi/TestHoodieOptionConfig.scala    |   7 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |   5 +-
 rfc/rfc-46/rfc-46.md                               |   4 +-
 84 files changed, 927 insertions(+), 859 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index a5931c2e0a..1d0a285912 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -139,8 +139,8 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
       .key("hoodie.datasource.write.merger.strategy")
-      .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
-      .withDocumentation("Id of merger strategy.  Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
+      .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+      .withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");
 
   public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
       .key("hoodie.datasource.write.keygenerator.class")
@@ -517,7 +517,6 @@ public class HoodieWriteConfig extends HoodieConfig {
   private HoodieCommonConfig commonConfig;
   private HoodieStorageConfig storageConfig;
   private EngineType engineType;
-  private HoodieRecordMerger recordMerger;
 
   /**
    * @deprecated Use {@link #TBL_NAME} and its methods instead
@@ -894,7 +893,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     super();
     this.engineType = EngineType.SPARK;
     this.clientSpecifiedViewStorageConfig = null;
-    applyMergerClass();
   }
 
   protected HoodieWriteConfig(EngineType engineType, Properties props) {
@@ -902,7 +900,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     Properties newProps = new Properties();
     newProps.putAll(props);
     this.engineType = engineType;
-    applyMergerClass();
     this.consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
     this.fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().fromProperties(newProps).build();
     this.clientSpecifiedViewStorageConfig = FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
@@ -914,15 +911,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
   }
 
-  private void applyMergerClass() {
-    List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
-        .map(String::trim)
-        .distinct()
-        .collect(Collectors.toList());
-    String mergerStrategy = getString(MERGER_STRATEGY);
-    this.recordMerger = HoodieRecordUtils.generateRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
-  }
-
   public static HoodieWriteConfig.Builder newBuilder() {
     return new Builder();
   }
@@ -935,7 +923,12 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public HoodieRecordMerger getRecordMerger() {
-    return recordMerger;
+    List<String> mergers = getSplitStringsOrDefault(MERGER_IMPLS).stream()
+        .map(String::trim)
+        .distinct()
+        .collect(Collectors.toList());
+    String mergerStrategy = getString(MERGER_STRATEGY);
+    return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, mergerStrategy);
   }
 
   public String getSchema() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 075d763261..564d63ba77 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -39,6 +38,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -253,21 +253,21 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
   }
 
   private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
-    Map<String, String> metadataValues = new HashMap<>();
-    String seqId =
-        HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+    MetadataValues metadataValues = new MetadataValues();
     if (config.populateMetaFields()) {
-      metadataValues.put(HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), fileId);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), partitionPath);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), hoodieRecord.getRecordKey());
-      metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(), instantTime);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(), seqId);
+      String seqId =
+          HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+      metadataValues.setFileName(fileId);
+      metadataValues.setPartitionPath(partitionPath);
+      metadataValues.setRecordKey(hoodieRecord.getRecordKey());
+      metadataValues.setCommitTime(instantTime);
+      metadataValues.setCommitSeqno(seqId);
     }
     if (config.allowOperationMetadataField()) {
-      metadataValues.put(HoodieRecord.HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName(), hoodieRecord.getOperation().getName());
+      metadataValues.setOperation(hoodieRecord.getOperation().getName());
     }
 
-    return hoodieRecord.updateValues(schema, prop, metadataValues);
+    return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);
   }
 
   private void initNewStatus() {
@@ -380,7 +380,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
 
       List<IndexedRecord> indexedRecords = new LinkedList<>();
       for (HoodieRecord hoodieRecord : recordList) {
-        indexedRecords.add(((HoodieAvroIndexedRecord) hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get()).getData());
+        indexedRecords.add(hoodieRecord.toIndexedRecord(tableSchema, config.getProps()).get().getData());
       }
 
       Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
@@ -511,7 +511,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
       record.seal();
     }
     // fetch the ordering val first in case the record was deflated.
-    final Comparable<?> orderingVal = record.getOrderingValue(config.getProps());
+    final Comparable<?> orderingVal = record.getOrderingValue(tableSchema, config.getProps());
     Option<HoodieRecord> indexedRecord = prepareRecord(record);
     if (indexedRecord.isPresent()) {
       // Skip the ignored record.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index 4f6b0428b2..26374b32fc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -93,11 +93,11 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
    */
   @Override
   public void write(HoodieRecord oldRecord) {
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     try {
       // NOTE: We're enforcing preservation of the record metadata to keep existing semantic
-      writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, config.getPayloadConfig().getProps(), true);
+      writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, config.getPayloadConfig().getProps(), true);
     } catch (IOException | RuntimeException e) {
       String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
           key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -109,6 +109,7 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
 
   @Override
   protected void writeIncomingRecords() throws IOException {
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     while (recordItr.hasNext()) {
       HoodieRecord<T> record = recordItr.next();
       if (needsUpdateLocation()) {
@@ -116,7 +117,7 @@ public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K, O
         record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
         record.seal();
       }
-      writeInsertRecord(record);
+      writeInsertRecord(record, schema);
     }
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index d6269a4fc9..d3782e9f20 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -27,11 +27,11 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
@@ -143,7 +143,9 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
         } else {
           rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
         }
-        rewriteRecord = rewriteRecord.updateValues(writeSchemaWithMetaFields, config.getProps(), Collections.singletonMap(HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), path.getName()));
+        MetadataValues metadataValues = new MetadataValues();
+        metadataValues.setFileName(path.getName());
+        rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
         if (preserveMetadata) {
           fileWriter.write(record.getRecordKey(), rewriteRecord, writeSchemaWithMetaFields);
         } else {
@@ -185,11 +187,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
     while (keyIterator.hasNext()) {
       final String key = keyIterator.next();
       HoodieRecord<T> record = recordMap.get(key);
-      if (useWriterSchema) {
-        write(record, tableSchemaWithMetaFields, config.getProps());
-      } else {
-        write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
-      }
+      write(record, useWriterSchema ? tableSchemaWithMetaFields : tableSchema, config.getProps());
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 0933d9f28b..28377918b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -27,16 +27,17 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -267,66 +268,64 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema) throws IOException {
     boolean isDelete = false;
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
-    if (combineRecordOp.isPresent()) {
+    if (combineRecordOpt.isPresent()) {
       updatedRecordsWritten++;
-      if (oldRecord.getData() != combineRecordOp.get().getData()) {
+      if (oldRecord.getData() != combineRecordOpt.get().getData()) {
         // the incoming record is chosen
-        isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+        isDelete = HoodieOperation.isDelete(newRecord.getOperation());
       } else {
         // the incoming record is dropped
         return false;
       }
     }
-    return writeRecord(hoodieRecord, combineRecordOp, schema, config.getPayloadConfig().getProps(), isDelete);
+    return writeRecord(newRecord, combineRecordOpt, writerSchema, config.getPayloadConfig().getProps(), isDelete);
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
     // just skip the ignored record
-    if (hoodieRecord.shouldIgnore(schema, config.getProps())) {
+    if (newRecord.shouldIgnore(schema, config.getProps())) {
       return;
     }
-    writeInsertRecord(hoodieRecord, Option.of(hoodieRecord), schema, config.getProps());
+    writeInsertRecord(newRecord, schema, config.getProps());
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties prop)
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema, Properties prop)
       throws IOException {
-    if (writeRecord(hoodieRecord, insertRecord, schema, prop, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
+    if (writeRecord(newRecord, Option.of(newRecord), schema, prop, HoodieOperation.isDelete(newRecord.getOperation()))) {
       insertRecordsWritten++;
     }
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws IOException {
-    return writeRecord(hoodieRecord, combineRecord, schema, prop, false);
+  protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws IOException {
+    return writeRecord(newRecord, combineRecord, schema, prop, false);
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
-    Option recordMetadata = hoodieRecord.getMetadata();
-    if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
+  protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
+    Option recordMetadata = newRecord.getMetadata();
+    if (!partitionPath.equals(newRecord.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
-          + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
-      writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
+          + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
+      writeStatus.markFailure(newRecord, failureEx, recordMetadata);
       return false;
     }
     try {
       if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
-        writeToFile(hoodieRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
+        writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
         recordsWritten++;
       } else {
         recordsDeleted++;
       }
-      writeStatus.markSuccess(hoodieRecord, recordMetadata);
+      writeStatus.markSuccess(newRecord, recordMetadata);
       // deflate record payload after recording success. This will help users access payload as a
       // part of marking
       // record successful.
-      hoodieRecord.deflate();
+      newRecord.deflate();
       return true;
     } catch (Exception e) {
-      LOG.error("Error writing record  " + hoodieRecord, e);
-      writeStatus.markFailure(hoodieRecord, e, recordMetadata);
+      LOG.error("Error writing record  " + newRecord, e);
+      writeStatus.markFailure(newRecord, e, recordMetadata);
     }
     return false;
   }
@@ -335,21 +334,24 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
    * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
    */
   public void write(HoodieRecord<T> oldRecord) {
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+    Schema newSchema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     boolean copyOldRecord = true;
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     TypedProperties props = config.getPayloadConfig().getProps();
     if (keyToNewRecords.containsKey(key)) {
       // If we have duplicate records that we are updating, then the hoodie record will be deflated after
       // writing the first record. So make a copy of the record to be merged
-      HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
+      HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
       try {
-        Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord, hoodieRecord, schema, props);
+        Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
+        Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
+        Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
 
-        if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
+        if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
           // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
           copyOldRecord = true;
-        } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedRecord)) {
+        } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
           /*
            * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
            * write the combined new value
@@ -368,7 +370,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     if (copyOldRecord) {
       try {
         // NOTE: We're enforcing preservation of the record metadata to keep existing semantic
-        writeToFile(new HoodieKey(key, partitionPath), oldRecord, schema, props, true);
+        writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, props, true);
       } catch (IOException | RuntimeException e) {
         String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
                 key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
@@ -386,7 +388,9 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     } else {
       rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
     }
-    rewriteRecord = rewriteRecord.updateValues(writeSchemaWithMetaFields, prop, Collections.singletonMap(HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), newFilePath.getName()));
+    MetadataValues metadataValues = new MetadataValues();
+    metadataValues.setFileName(newFilePath.getName());
+    rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
     if (shouldPreserveRecordMetadata) {
       // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
       //       file holding this record even in cases when overall metadata is preserved
@@ -400,10 +404,11 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     // write out any pending records (this can happen when inserts are turned into updates)
     Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
         ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     while (newRecordsItr.hasNext()) {
       HoodieRecord<T> hoodieRecord = newRecordsItr.next();
       if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
-        writeInsertRecord(hoodieRecord);
+        writeInsertRecord(hoodieRecord, schema);
       }
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 6e8fda0b10..fda1435345 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -30,10 +30,12 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -44,6 +46,9 @@ import java.util.Map;
  * A merge handle that supports logging change logs.
  */
 public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieMergeHandleWithChangeLog.class);
+
   protected final HoodieCDCLogger cdcLogger;
 
   public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -75,24 +80,35 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
       throws IOException {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+    // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+    Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+    final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
     if (result) {
-      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+      boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+      Option<IndexedRecord> avroOpt = savedCombineRecordOp
+          .flatMap(r -> {
+            try {
+              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+                  .map(HoodieAvroIndexedRecord::getData);
+            } catch (IOException e) {
+              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+              return Option.empty();
+            }
+          });
+      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    // Get the data before deflated
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
-    Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema, this.config.getProps())
-        .map(HoodieRecord::getData);
-    super.writeInsertRecord(hoodieRecord);
-    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, recordOption);
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+    // TODO Remove these unnecessary newInstance invocations
+    HoodieRecord<T> savedRecord = newRecord.newInstance();
+    super.writeInsertRecord(newRecord, schema);
+    if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+      cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 60c6a2da7f..a4d7b5efa5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -72,7 +73,9 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
    */
   @Override
   public void write(HoodieRecord oldRecord) {
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
+    Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+    Schema newSchema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
 
     // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than
     // the oldRecord's key.
@@ -89,11 +92,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
         throw new HoodieUpsertException("Insert/Update not in sorted order");
       }
       try {
-        if (useWriterSchemaForCompaction) {
-          writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchemaWithMetaFields, config.getProps());
-        } else {
-          writeRecord(hoodieRecord, Option.of(hoodieRecord), tableSchema, config.getProps());
-        }
+        writeRecord(hoodieRecord, Option.of(hoodieRecord), newSchema, config.getProps());
         insertRecordsWritten++;
         writtenRecordKeys.add(keyToPreWrite);
       } catch (IOException e) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 727765b3e2..819cfd0754 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -53,10 +53,10 @@ public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O> extends HoodieMerg
     super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties props)
+  protected boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties props)
       throws IOException {
-    final boolean result = super.writeRecord(hoodieRecord, insertRecord, schema, props);
-    this.cdcLogger.put(hoodieRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+    final boolean result = super.writeRecord(newRecord, insertRecord, schema, props);
+    this.cdcLogger.put(newRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
     return result;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d2de068838..4b7240d432 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -39,7 +39,7 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> implements Seriali
 
   private final HoodieTable<T, I, K, O> hoodieTable;
   private final transient HoodieEngineContext engineContext;
-  private final HoodieWriteConfig writeConfig;
+  protected final HoodieWriteConfig writeConfig;
   protected final HoodieRecordType recordType;
 
   public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index a3a3188df9..4c5db03d35 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -83,7 +83,7 @@ public abstract class BaseMergeHelper<T, I, K, O> {
   @Nonnull
   private static HoodieRecord mergeRecords(HoodieRecord left, HoodieRecord right, Schema targetSchema) {
     try {
-      return left.mergeWith(right, targetSchema);
+      return left.joinWith(right, targetSchema);
     } catch (IOException e) {
       throw new HoodieIOException("Failed to merge records", e);
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 0b96529379..622ed4573e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -87,6 +88,11 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
     return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
   }
 
-  public abstract I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merge);
+  public I deduplicateRecords(
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger) {
+    return deduplicateRecordsInternal(records, index, parallelism, schema, HoodieAvroRecordMerger.withDeDuping(props), merger);
+  }
+
+  protected abstract I deduplicateRecordsInternal(
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merger);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 49edb981a5..e4fb3face4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -43,7 +43,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -97,7 +96,7 @@ public class HoodieMergeHelper<T> extends
       readSchema = mergeHandle.getWriterSchemaWithMetaFields();
     }
 
-    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+    BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
     Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
     boolean needToReWriteRecord = false;
     Map<String, String> renameCols = new HashMap<>();
@@ -139,13 +138,14 @@ public class HoodieMergeHelper<T> extends
         }
       }
 
-      wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
+      wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
           new UpdateHandler(mergeHandle), record -> {
+        HoodieRecord recordCopy = record.copy();
         if (!externalSchemaTransformation) {
-          return record;
+          return recordCopy;
         }
         try {
-          return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+          return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
         } catch (IOException e) {
           throw new HoodieException(String.format("Failed to rewrite record. WriterSchema: %s; ReaderSchema: %s", writerSchema, readerSchema), e);
         }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index bd495f69da..1893132e7d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -53,8 +53,8 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
   }
 
   @Override
-  public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+  public HoodieData<HoodieRecord<T>> deduplicateRecordsInternal(
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
     // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -68,8 +68,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
     }).reduceByKey((rec1, rec2) -> {
       HoodieRecord<T> reducedRecord;
       try {
-        // Precombine do not need schema and do not return null
-        reducedRecord =  recordMerger.merge(rec1, rec2, schema.get(), props).get();
+        reducedRecord =  merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft();
       } catch (IOException e) {
         throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
       }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
index 662e8381e6..4faa7b5fa6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatAndReplaceHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -56,7 +57,8 @@ public class FlinkConcatAndReplaceHandle<T, I, K, O>
    */
   @Override
   public void write(HoodieRecord oldRecord) {
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
+    Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     try {
       fileWriter.write(key, oldRecord, writeSchema);
     } catch (IOException | RuntimeException e) {
@@ -70,9 +72,10 @@ public class FlinkConcatAndReplaceHandle<T, I, K, O>
 
   @Override
   protected void writeIncomingRecords() throws IOException {
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     while (recordItr.hasNext()) {
       HoodieRecord<T> record = recordItr.next();
-      writeInsertRecord(record);
+      writeInsertRecord(record, schema);
     }
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
index 8e4fb50e52..e23aa6e74a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -55,7 +56,8 @@ public class FlinkConcatHandle<T, I, K, O>
    */
   @Override
   public void write(HoodieRecord oldRecord) {
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
+    Schema oldSchema = config.populateMetaFields() ? tableSchemaWithMetaFields : tableSchema;
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     try {
       fileWriter.write(key, oldRecord, writeSchema);
     } catch (IOException | RuntimeException e) {
@@ -69,9 +71,10 @@ public class FlinkConcatHandle<T, I, K, O>
 
   @Override
   protected void writeIncomingRecords() throws IOException {
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     while (recordItr.hasNext()) {
       HoodieRecord<T> record = recordItr.next();
-      writeInsertRecord(record);
+      writeInsertRecord(record, schema);
     }
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index fa14ba0418..7dde24ec8c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -28,7 +28,11 @@ import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
 
@@ -44,6 +48,9 @@ import java.util.List;
  */
 public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeAndReplaceHandle<T, I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandleWithChangeLog.class);
+
   private final HoodieCDCLogger cdcLogger;
 
   public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -59,20 +66,36 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
       throws IOException {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+    // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+    Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+    final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
     if (result) {
-      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+      boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+      Option<IndexedRecord> avroOpt = savedCombineRecordOp
+          .flatMap(r -> {
+            try {
+              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+                  .map(HoodieAvroIndexedRecord::getData);
+            } catch (IOException e) {
+              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+              return Option.empty();
+            }
+          });
+      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    super.writeInsertRecord(hoodieRecord);
-    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+    // TODO Remove these unnecessary newInstance invocations
+    HoodieRecord<T> savedRecord = newRecord.newInstance();
+    super.writeInsertRecord(newRecord, schema);
+    if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+      cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
+      newRecord.deflate();
     }
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index b85c7c270f..4c9a7484c1 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -62,20 +64,35 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+  protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema)
       throws IOException {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
+    // TODO [HUDI-5019] Remove these unnecessary newInstance invocations
+    Option<HoodieRecord> savedCombineRecordOp = combineRecordOpt.map(HoodieRecord::newInstance);
+    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+    final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combineRecordOpt, writerSchema);
     if (result) {
-      boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
+      boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
+      Option<IndexedRecord> avroOpt = savedCombineRecordOp
+          .flatMap(r -> {
+            try {
+              return r.toIndexedRecord(writerSchema, config.getPayloadConfig().getProps())
+                  .map(HoodieAvroIndexedRecord::getData);
+            } catch (IOException e) {
+              LOG.error("Fail to get indexRecord from " + savedCombineRecordOp, e);
+              return Option.empty();
+            }
+          });
+      cdcLogger.put(newRecord, (GenericRecord) savedOldRecord.getData(), isDelete ? Option.empty() : avroOpt);
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    super.writeInsertRecord(hoodieRecord);
-    if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
+  protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema) throws IOException {
+    // TODO Remove these unnecessary newInstance invocations
+    HoodieRecord<T> savedRecord = newRecord.newInstance();
+    super.writeInsertRecord(newRecord, schema);
+    if (!HoodieOperation.isDelete(newRecord.getOperation())) {
+      cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
     }
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index a15a438c7a..06886dbb07 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -32,7 +32,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
@@ -82,7 +81,7 @@ public class FlinkMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>
       readSchema = mergeHandle.getWriterSchemaWithMetaFields();
     }
 
-    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+    BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
     try {
       final Iterator<HoodieRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
@@ -91,13 +90,14 @@ public class FlinkMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>
         readerIterator = reader.getRecordIterator(readSchema);
       }
 
-      wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
+      wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
           Option.of(new UpdateHandler(mergeHandle)), record -> {
+        HoodieRecord recordCopy = record.copy();
         if (!externalSchemaTransformation) {
-          return record;
+          return recordCopy;
         }
         try {
-          return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+          return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
         } catch (IOException e) {
           throw new HoodieException(e);
         }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 4993c43608..db9422336d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -91,8 +91,8 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
   }
 
   @Override
-  public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+  public List<HoodieRecord<T>> deduplicateRecordsInternal(
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
     // If index used is global, then records are expected to differ in their partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
         .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
@@ -103,7 +103,7 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
       HoodieRecord<T> reducedRecord;
       try {
         // Precombine do not need schema and do not return null
-        reducedRecord =  recordMerger.merge(rec1, rec2, null, props).get();
+        reducedRecord =  merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
       } catch (IOException e) {
         throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
       }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index fe398238da..e62643ad4e 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -216,7 +216,7 @@ public abstract class JavaExecutionStrategy<T>
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
         HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
         Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
-        recordIterator.forEachRemaining(record -> records.add(record.wrapIntoHoodieRecordPayloadWithKeyGen(new Properties(), Option.empty())));
+        recordIterator.forEachRemaining(record -> records.add(record.copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, new Properties(), Option.empty())));
       } catch (IOException e) {
         throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
             + " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
index 9593b5e72f..54a044ae26 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java
@@ -32,7 +32,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
@@ -80,7 +79,7 @@ public class JavaMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>>
       readSchema = mergeHandle.getWriterSchemaWithMetaFields();
     }
 
-    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+    BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
     try {
       final Iterator<HoodieRecord> readerIterator;
       if (baseFile.getBootstrapBaseFile().isPresent()) {
@@ -89,13 +88,14 @@ public class JavaMergeHelper<T> extends BaseMergeHelper<T, List<HoodieRecord<T>>
         readerIterator = reader.getRecordIterator(readSchema);
       }
 
-      wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
+      wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
           Option.of(new UpdateHandler(mergeHandle)), record -> {
+        HoodieRecord recordCopy = record.copy();
         if (!externalSchemaTransformation) {
-          return record;
+          return recordCopy;
         }
         try {
-          return ((HoodieRecord) record).rewriteRecord(writerSchema, new Properties(), readerSchema);
+          return recordCopy.rewriteRecord(writerSchema, new Properties(), readerSchema);
         } catch (IOException e) {
           throw new HoodieException(e);
         }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 7edbb55c3e..14016cb5c0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -58,8 +58,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
   }
 
   @Override
-  public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger recordMerger) {
+  public List<HoodieRecord<T>> deduplicateRecordsInternal(
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, Properties props, HoodieRecordMerger merger) {
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
       HoodieKey hoodieKey = record.getKey();
@@ -72,7 +72,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
     return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       HoodieRecord<T> reducedRecord;
       try {
-        reducedRecord =  recordMerger.merge(rec1, rec2, schema, props).get();
+        reducedRecord =  merger.merge(rec1, schema, rec2, schema, props).get().getLeft();
       } catch (IOException e) {
         throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
       }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 4c73012d2c..64264a1a10 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -85,7 +85,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
 import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
@@ -330,14 +329,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
             try {
               Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
               HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
-              Option<BaseKeyGenerator> keyGeneratorOp;
-              if (!Boolean.parseBoolean(writeConfig.getProps().getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
-                keyGeneratorOp = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
-              } else {
-                keyGeneratorOp = Option.empty();
-              }
+              Option<BaseKeyGenerator> keyGeneratorOp =
+                  writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
               MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
-                  rec -> ((HoodieRecord) rec).wrapIntoHoodieRecordPayloadWithKeyGen(writeConfig.getProps(), keyGeneratorOp));
+                  rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
+                      writeConfig.getProps(), keyGeneratorOp));
               iteratorsForPartition.add(mappingIterator);
             } catch (IOException e) {
               throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 918542dbef..765fa663f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -63,8 +63,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
-
 /**
  * Clustering strategy to submit single spark jobs.
  * MultipleSparkJobExecution strategy is not ideal for use cases that require large number of clustering groups
@@ -150,16 +148,12 @@ public abstract class SingleSparkJobExecutionStrategy<T>
       Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
       Iterable<HoodieRecord<T>> indexedRecords = () -> {
         try {
-
           HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
-          Option<BaseKeyGenerator> keyGeneratorOp;
-          if (!Boolean.parseBoolean(getWriteConfig().getProps().getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
-            keyGeneratorOp = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(getWriteConfig().getProps()));
-          } else {
-            keyGeneratorOp = Option.empty();
-          }
+          Option<BaseKeyGenerator> keyGeneratorOp =
+              writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
           MappingIterator mappingIterator = new MappingIterator((ClosableIterator<HoodieRecord>) baseFileReader.getRecordIterator(readerSchema),
-              rec -> ((HoodieRecord) rec).wrapIntoHoodieRecordPayloadWithKeyGen(getWriteConfig().getProps(), keyGeneratorOp));
+              rec -> ((HoodieRecord) rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema,
+                  getWriteConfig().getProps(), keyGeneratorOp));
           return mappingIterator;
         } catch (IOException e) {
           throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index f866914b27..e111679842 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -24,8 +24,10 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
@@ -37,7 +39,7 @@ import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
 import org.apache.spark.sql.catalyst.CatalystTypeConverters;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -47,6 +49,8 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getNullableValAsString;
+import static org.apache.hudi.util.HoodieSparkRecordUtils.getValue;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -55,60 +59,77 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
  */
 public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
 
-  private StructType structType = null;
-  private Option<Long> schemaFingerPrint = Option.empty();
+  /**
+   * Record copy operation to avoid double copying. InternalRow do not need to copy twice.
+   */
+  private boolean copy;
 
+  /**
+   * We should use this construction method when we read internalRow from file.
+   * The record constructed by this method must be used in iter.
+   */
   public HoodieSparkRecord(InternalRow data, StructType schema) {
     super(null, data);
-    initSchema(schema);
+    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, false);
+    this.copy = false;
   }
 
   public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
     super(key, data);
-    initSchema(schema);
+    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+    this.copy = true;
   }
 
   public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) {
     super(key, data, operation);
-    initSchema(schema);
+    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+    this.copy = true;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, boolean copy) {
+    super(key, data, operation);
+    this.data = HoodieInternalRowUtils.projectUnsafe(data, schema, true);
+    this.copy = copy;
   }
 
   public HoodieSparkRecord(HoodieSparkRecord record) {
     super(record);
-    initSchema(record.getStructType());
+    this.copy = record.copy;
   }
 
   @Override
-  public HoodieRecord<InternalRow> newInstance() {
+  public HoodieSparkRecord newInstance() {
     return new HoodieSparkRecord(this);
   }
 
   @Override
-  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
-    return new HoodieSparkRecord(key, data, getStructType(), op);
+  public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, null, op);
   }
 
   @Override
-  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
-    return new HoodieSparkRecord(key, data, getStructType());
+  public HoodieSparkRecord newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, null);
   }
 
   @Override
-  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt) {
     if (key != null) {
       return getRecordKey();
     }
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
-        .getRecordKey(data, getStructType()).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+        .getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
   }
 
   @Override
-  public String getRecordKey(String keyFieldName) {
+  public String getRecordKey(Schema recordSchema, String keyFieldName) {
     if (key != null) {
       return getRecordKey();
     }
-    DataType dataType = getStructType().apply(keyFieldName).dataType();
-    int pos = getStructType().fieldIndex(keyFieldName);
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    DataType dataType = structType.apply(keyFieldName).dataType();
+    int pos = structType.fieldIndex(keyFieldName);
     return data.get(pos, dataType).toString();
   }
 
@@ -118,76 +139,80 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
   }
 
   @Override
-  public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
-    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, getStructType(), consistentLogicalTimestampEnabled);
+  public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
   }
 
   @Override
-  public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
-    StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
-    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
-    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, getStructType(), (InternalRow) other.getData(), otherStructType, writerStructType);
-    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
+    return new HoodieSparkRecord(getKey(), mergeRow, targetStructType, getOperation(), copy);
   }
 
   @Override
   public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
-    UTF8String[] metaFields = extractMetaField(targetStructType);
+    UTF8String[] metaFields = extractMetaField(structType, targetStructType);
     if (metaFields.length == 0) {
       throw new UnsupportedOperationException();
     }
 
-    InternalRow resultRow;
-    if (extractMetaField(getStructType()).length == 0) {
-      resultRow = new HoodieInternalRow(metaFields, data, false);
-    } else {
-      resultRow = new HoodieInternalRow(metaFields, data, true);
-    }
-
-    return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation());
+    boolean containMetaFields = hasMetaField(structType);
+    InternalRow resultRow = new HoodieInternalRow(metaFields, data, containMetaFields);
+    return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation(), copy);
   }
 
   @Override
   public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
-    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, getStructType(), newStructType, renameCols);
-    UnsafeProjection unsafeConvert = HoodieInternalRowUtils.getCachedUnsafeConvert(newStructType);
-    InternalRow resultRow = unsafeConvert.apply(rewriteRow);
-    UTF8String[] metaFields = extractMetaField(newStructType);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    UTF8String[] metaFields = extractMetaField(structType, newStructType);
     if (metaFields.length > 0) {
-      resultRow = new HoodieInternalRow(metaFields, data, true);
+      rewriteRow = new HoodieInternalRow(metaFields, data, true);
     }
 
-    return new HoodieSparkRecord(getKey(), resultRow, newStructType, getOperation());
+    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation(), copy);
   }
 
   @Override
-  public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
-    metadataValues.forEach((key, value) -> {
-      int pos = getStructType().fieldIndex(key);
+  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    metadataValues.getKv().forEach((key, value) -> {
+      int pos = structType.fieldIndex(key);
       if (value != null) {
         data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
       }
     });
 
-    return new HoodieSparkRecord(getKey(), data, getStructType(), getOperation());
+    return new HoodieSparkRecord(getKey(), data, structType, getOperation(), copy);
+  }
+
+  @Override
+  public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    int pos = structType.fieldIndex(keyFieldName);
+    data.update(pos, CatalystTypeConverters.convertToCatalyst(StringUtils.EMPTY_STRING));
+    return this;
   }
 
   @Override
-  public boolean isDelete(Schema schema, Properties props) throws IOException {
+  public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
     if (null == data) {
       return true;
     }
-    if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+    if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
       return false;
     }
-    Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+    Object deleteMarker = data.get(recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
     return deleteMarker instanceof Boolean && (boolean) deleteMarker;
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+  public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
     if (data != null && data.equals(SENTINEL)) {
       return true;
     } else {
@@ -197,34 +222,36 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
-      Schema schema, Properties props,
+      Schema recordSchema, Properties props,
       Option<Pair<String, String>> simpleKeyGenFieldsOpt,
       Boolean withOperation,
       Option<String> partitionNameOp,
       Boolean populateMetaFields) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     if (populateMetaFields) {
-      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, withOperation);
+      return convertToHoodieSparkRecord(structType, this, withOperation);
     } else if (simpleKeyGenFieldsOpt.isPresent()) {
-      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
+      return convertToHoodieSparkRecord(structType, this, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
     } else {
-      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, withOperation, partitionNameOp);
+      return convertToHoodieSparkRecord(structType, this, withOperation, partitionNameOp);
     }
   }
 
   @Override
-  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     String key;
     String partition;
     if (keyGen.isPresent() && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
       SparkKeyGeneratorInterface keyGenerator = (SparkKeyGeneratorInterface) keyGen.get();
-      key = keyGenerator.getRecordKey(data, getStructType()).toString();
-      partition = keyGenerator.getPartitionPath(data, getStructType()).toString();
+      key = keyGenerator.getRecordKey(data, structType).toString();
+      partition = keyGenerator.getPartitionPath(data, structType).toString();
     } else {
       key = data.get(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal(), StringType).toString();
       partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
     }
     HoodieKey hoodieKey = new HoodieKey(key, partition);
-    return new HoodieSparkRecord(hoodieKey, data, getStructType(), getOperation());
+    return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(), copy);
   }
 
   @Override
@@ -233,51 +260,75 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
   }
 
   @Override
-  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException {
+  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties prop) throws IOException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Properties props) {
+  public HoodieSparkRecord copy() {
+    if (!copy) {
+      this.data = this.data.copy();
+      copy = true;
+    }
+    return this;
+  }
+
+  @Override
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     String orderingField = ConfigUtils.getOrderingField(props);
-    if (!HoodieCatalystExpressionUtils$.MODULE$.existField(getStructType(), orderingField)) {
+    if (!HoodieCatalystExpressionUtils$.MODULE$.existField(structType, orderingField)) {
       return 0;
     } else {
-      NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(getStructType(),
-          orderingField);
-      Comparable<?> value = (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(
-          data, nestedFieldPath);
+      NestedFieldPath nestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+      Comparable<?> value = (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
       return value;
     }
   }
 
-  public StructType getStructType() {
-    if (schemaFingerPrint.isPresent()) {
-      return HoodieInternalRowUtils.getCachedSchemaFromFingerPrint(schemaFingerPrint.get());
-    } else {
-      return structType;
-    }
+  private UTF8String[] extractMetaField(StructType recordStructType, StructType structTypeWithMetaField) {
+    return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
+        .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structTypeWithMetaField, f))
+        .map(field -> {
+          if (HoodieCatalystExpressionUtils$.MODULE$.existField(recordStructType, field)) {
+            return data.getUTF8String(HOODIE_META_COLUMNS_NAME_TO_POS.get(field));
+          } else {
+            return UTF8String.EMPTY_UTF8;
+          }
+        }).toArray(UTF8String[]::new);
   }
 
-  private void initSchema(StructType structType) {
-    if (HoodieInternalRowUtils.containsCompressedSchema(structType)) {
-      HoodieInternalRowUtils.addCompressedSchema(structType);
-      this.schemaFingerPrint = Option.of(HoodieInternalRowUtils.getCachedFingerPrintFromSchema(structType));
-    } else {
-      this.structType = structType;
-    }
+  private static boolean hasMetaField(StructType structType) {
+    return HoodieCatalystExpressionUtils$.MODULE$.existField(structType, COMMIT_TIME_METADATA_FIELD);
   }
 
-  public void setStructType(StructType structType) {
-    if (structType != null) {
-      initSchema(structType);
-    }
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+   */
+  private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, boolean withOperationField) {
+    return convertToHoodieSparkRecord(structType, record,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, Option.empty());
   }
 
-  private UTF8String[] extractMetaField(StructType structType) {
-    return HOODIE_META_COLUMNS_WITH_OPERATION.stream()
-        .filter(f -> HoodieCatalystExpressionUtils$.MODULE$.existField(structType, f))
-        .map(UTF8String::fromString)
-        .toArray(UTF8String[]::new);
+  private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, boolean withOperationField,
+      Option<String> partitionName) {
+    return convertToHoodieSparkRecord(structType, record,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, HoodieSparkRecord record, Pair<String, String> recordKeyPartitionPathFieldPair,
+      boolean withOperationField, Option<String> partitionName) {
+    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), record.data).toString();
+    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
+        getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
+
+    HoodieOperation operation = withOperationField
+        ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), record.data, structType, operation, record.copy);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
index 75be1402ea..0735527ae7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.FlatLists;
+import org.apache.hudi.common.util.collection.FlatLists.ComparableList;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.io.AppendHandleFactory;
@@ -234,9 +236,9 @@ public class RDDConsistentBucketPartitioner<T> extends RDDBucketIndexPartitioner
     final String[] sortColumns = sortColumnNames;
     final SerializableSchema schema = new SerializableSchema(HoodieAvroUtils.addMetadataFields((new Schema.Parser().parse(table.getConfig().getSchema()))));
     Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & Serializable) (t1, t2) -> {
-      Object obj1 = t1.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
-      Object obj2 = t2.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
-      return ((Comparable) obj1).compareTo(obj2);
+      ComparableList obj1 = FlatLists.ofComparableArray(t1.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled));
+      ComparableList obj2 = FlatLists.ofComparableArray(t2.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled));
+      return obj1.compareTo(obj2);
     };
 
     return records.mapToPair(record -> new Tuple2<>(record, record))
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index dbbcb22e90..e723d724b6 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -61,7 +61,7 @@ public class RDDCustomColumnsSortPartitioner<T>
     final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
     return records.sortBy(
         record -> {
-          Object recordValue = record.getRecordColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
+          Object recordValue = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
           // null values are replaced with empty string for null_first order
           if (recordValue == null) {
             return StringUtils.EMPTY_STRING;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 5ba5c5dfdf..0a042d3362 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -95,7 +94,6 @@ public class RDDSpatialCurveSortPartitioner<T>
           });
     } else if (recordType == HoodieRecordType.SPARK) {
       StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get());
-      Broadcast<StructType> structTypeBC = sparkEngineContext.getJavaSparkContext().broadcast(structType);
       Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(), sparkEngineContext.getSqlContext().sparkSession(), structType);
 
       Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
@@ -107,7 +105,7 @@ public class RDDSpatialCurveSortPartitioner<T>
             String key = internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
             String partition = internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
             HoodieKey hoodieKey = new HoodieKey(key, partition);
-            HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structTypeBC.value());
+            HoodieRecord hoodieRecord = new HoodieSparkRecord(hoodieKey, internalRow, structType);
             return hoodieRecord;
           });
     } else {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index f0d650ecdc..112981f902 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -27,17 +27,6 @@ import org.apache.spark.sql.internal.SQLConf;
 
 public class HoodieSparkFileReaderFactory extends HoodieFileReaderFactory  {
 
-  private static class SingletonHolder {
-    private static final HoodieSparkFileReaderFactory INSTANCE = new HoodieSparkFileReaderFactory();
-  }
-
-  private HoodieSparkFileReaderFactory() {
-  }
-
-  public static HoodieFileReaderFactory getFileReaderFactory() {
-    return SingletonHolder.INSTANCE;
-  }
-
   protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
     conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(),
         SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index f746bb0e8e..a4211656b0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -39,15 +39,6 @@ import java.io.IOException;
 
 public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
 
-  private static class SingletonHolder {
-
-    private static final HoodieSparkFileWriterFactory INSTANCE = new HoodieSparkFileWriterFactory();
-  }
-
-  public static HoodieFileWriterFactory getFileWriterFactory() {
-    return HoodieSparkFileWriterFactory.SingletonHolder.INSTANCE;
-  }
-
   @Override
   protected HoodieFileWriter newParquetFileWriter(
       String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 7078393d28..a46174cbae 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -86,13 +86,13 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
     conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()));
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), (Boolean) SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()));
     InputFile inputFile = HadoopInputFile.fromPath(path, conf);
-    ParquetReader reader = new ParquetReader.Builder<InternalRow>(inputFile) {
+    ParquetReader<InternalRow> reader = new ParquetReader.Builder<InternalRow>(inputFile) {
       @Override
       protected ReadSupport getReadSupport() {
         return new ParquetReadSupport();
       }
     }.withConf(conf).build();
-    ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader, InternalRow::copy);
+    ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index b47a1c765d..3828f63564 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -22,7 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -44,7 +44,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Properties;
 
 class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
@@ -70,12 +69,15 @@ class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
             .getFileReader(table.getHadoopConf(), sourceFilePath);
     try {
       wrapper = new BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
-          reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
+          reader.getRecordIterator(), new BootstrapRecordConsumer(bootstrapHandle), record -> {
         try {
-          String recKey = inp.getRecordKey(Option.of(keyGenerator));
-          HoodieRecord hoodieRecord = inp.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
+          HoodieRecord recordCopy = record.copy();
+          String recKey = recordCopy.getRecordKey(reader.getSchema(), Option.of(keyGenerator));
+          HoodieRecord hoodieRecord = recordCopy.rewriteRecord(reader.getSchema(), config.getProps(), HoodieAvroUtils.RECORD_KEY_SCHEMA);
+          MetadataValues metadataValues = new MetadataValues();
+          metadataValues.setRecordKey(recKey);
           return hoodieRecord
-              .updateValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), Collections.singletonMap(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), recKey))
+              .updateMetadataValues(HoodieAvroUtils.RECORD_KEY_SCHEMA, new Properties(), metadataValues)
               .newInstance(new HoodieKey(recKey, partitionPath));
         } catch (IOException e) {
           LOG.error("Unable to overrideMetadataFieldValue", e);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
index 4779e8e05f..65ac51a1c7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java
@@ -19,13 +19,8 @@
 package org.apache.hudi.util;
 
 import org.apache.hudi.HoodieInternalRowUtils;
-import org.apache.hudi.commmon.model.HoodieSparkRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
@@ -35,37 +30,7 @@ import org.apache.spark.sql.types.StructType;
 
 public class HoodieSparkRecordUtils {
 
-  /**
-   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
-   */
-  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) {
-    return convertToHoodieSparkRecord(structType, data,
-        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
-        withOperationField, Option.empty());
-  }
-
-  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField,
-      Option<String> partitionName) {
-    return convertToHoodieSparkRecord(structType, data,
-        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
-        withOperationField, partitionName);
-  }
-
-  /**
-   * Utility method to convert bytes to HoodieRecord using schema and payload class.
-   */
-  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair<String, String> recordKeyPartitionPathFieldPair,
-      boolean withOperationField, Option<String> partitionName) {
-    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();
-    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
-        getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString());
-
-    HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
-    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, structType, operation);
-  }
-
-  private static Object getValue(StructType structType, String fieldName, InternalRow row) {
+  public static Object getValue(StructType structType, String fieldName, InternalRow row) {
     NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
     return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
   }
@@ -77,7 +42,7 @@ public class HoodieSparkRecordUtils {
    * @param fieldName The field name
    * @return the string form of the field or empty if the schema does not contain the field name or the value is null
    */
-  private static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
+  public static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
     String fieldVal = !HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName)
         ? null : StringUtils.objToString(getValue(structType, fieldName, row));
     return Option.ofNullable(fieldVal);
@@ -91,21 +56,15 @@ public class HoodieSparkRecordUtils {
    * @param structType  {@link StructType} instance.
    * @return Column value if a single column, or concatenated String values by comma.
    */
-  public static Object getRecordColumnValues(InternalRow row,
+  public static Object[] getRecordColumnValues(InternalRow row,
       String[] columns,
       StructType structType, boolean consistentLogicalTimestampEnabled) {
-    if (columns.length == 1) {
-      NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-      return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-    } else {
-      // TODO this is inefficient, instead we can simply return array of Comparable
-      StringBuilder sb = new StringBuilder();
-      for (String col : columns) {
-        // TODO support consistentLogicalTimestampEnabled
-        NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-        return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-      }
-      return sb.toString();
+    Object[] objects = new Object[columns.length];
+    for (int i = 0; i < objects.length; i++) {
+      NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[i]);
+      Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+      objects[i] = value;
     }
+    return objects;
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
index cd259974d9..169ccd61c3 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala
@@ -21,16 +21,15 @@ package org.apache.hudi
 import java.nio.charset.StandardCharsets
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
-import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
+import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieCatalystExpressionUtils
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, Projection}
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieUnsafeRowUtils}
 import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.HoodieUnsafeRowUtils
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.types._
@@ -43,25 +42,9 @@ object HoodieInternalRowUtils {
     ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
       override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
     })
-  val unsafeConvertThreadLocal: ThreadLocal[HashMap[StructType, UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[StructType, UnsafeProjection]] {
-      override def get(): HashMap[StructType, UnsafeProjection] = new HashMap[StructType, UnsafeProjection]
-    })
   val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val schemaFingerPrintMap = new ConcurrentHashMap[Long, StructType]
-  val fingerPrintSchemaMap = new ConcurrentHashMap[StructType, Long]
   val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#stitchRecords(org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
-    val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
-    val row = new JoinedRow(left, right)
-    val projection = getCachedUnsafeProjection(mergeSchema, stitchedSchema)
-    projection(row)
-  }
-
   /**
    * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
    */
@@ -218,16 +201,7 @@ object HoodieInternalRowUtils {
     orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeConvert(structType: StructType): UnsafeProjection = {
-    val map = unsafeConvertThreadLocal.get()
-    if (!map.containsKey(structType)) {
-      val projection = UnsafeProjection.create(structType)
-      map.put(structType, projection)
-    }
-    map.get(structType)
-  }
-
-  def getCachedUnsafeProjection(from: StructType, to: StructType): Projection = {
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
     val schemaPair = (from, to)
     val map = unsafeProjectionThreadLocal.get()
     if (!map.containsKey(schemaPair)) {
@@ -245,33 +219,16 @@ object HoodieInternalRowUtils {
     schemaMap.get(schema)
   }
 
-  def getCachedSchemaFromFingerPrint(fingerPrint: Long): StructType = {
-    if (!schemaFingerPrintMap.containsKey(fingerPrint)) {
-      throw new IllegalArgumentException("Not exist " + fingerPrint)
-    }
-    schemaFingerPrintMap.get(fingerPrint)
-  }
-
-  def getCachedFingerPrintFromSchema(schema: StructType): Long = {
-    if (!fingerPrintSchemaMap.containsKey(schema)) {
-      throw new IllegalArgumentException("Not exist " + schema)
-    }
-    fingerPrintSchemaMap.get(schema)
-  }
-
-  def addCompressedSchema(schema: StructType): Unit ={
-    if (!fingerPrintSchemaMap.containsKey(schema)) {
-      val fingerPrint = SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8))
-      schemaFingerPrintMap.put(fingerPrint, schema)
-      fingerPrintSchemaMap.put(schema, fingerPrint)
+  def projectUnsafe(row: InternalRow, structType: StructType, copy: Boolean = true): InternalRow = {
+    if (row == null || row.isInstanceOf[UnsafeRow] || row.isInstanceOf[HoodieInternalRow]) {
+      row
+    } else {
+      val unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType).apply(row)
+      if (copy) unsafeRow.copy() else unsafeRow
     }
   }
 
-  def containsCompressedSchema(schema: StructType): Boolean = {
-    fingerPrintSchemaMap.containsKey(schema)
-  }
-
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
     if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
       oldSchema match {
         case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType =>
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala
deleted file mode 100644
index e3407d19b6..0000000000
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hudi
-
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.{Input, Output}
-import com.twitter.chill.KSerializer
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import org.apache.avro.SchemaNormalization
-import org.apache.commons.io.IOUtils
-import org.apache.hudi.commmon.model.HoodieSparkRecord
-import org.apache.spark.io.CompressionCodec
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkEnv, SparkException}
-import scala.collection.mutable
-
-/**
- * Custom serializer used for generic spark records. If the user registers the schemas
- * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
- * schema, as to reduce network IO.
- * Actions like parsing or compressing schemas are computationally expensive so the serializer
- * caches all previously seen values as to reduce the amount of work needed to do.
- * @param schemas a map where the keys are unique IDs for spark schemas and the values are the
- *                string representation of the Avro schema, used to decrease the amount of data
- *                that needs to be serialized.
- */
-class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] {
-  /** Used to reduce the amount of effort to compress the schema */
-  private val compressCache = new mutable.HashMap[StructType, Array[Byte]]()
-  private val decompressCache = new mutable.HashMap[ByteBuffer, StructType]()
-
-  /** Fingerprinting is very expensive so this alleviates most of the work */
-  private val fingerprintCache = new mutable.HashMap[StructType, Long]()
-  private val schemaCache = new mutable.HashMap[Long, StructType]()
-
-  // GenericAvroSerializer can't take a SparkConf in the constructor b/c then it would become
-  // a member of KryoSerializer, which would make KryoSerializer not Serializable.  We make
-  // the codec lazy here just b/c in some unit tests, we use a KryoSerializer w/out having
-  // the SparkEnv set (note those tests would fail if they tried to serialize avro data).
-  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
-
-  /**
-   * Used to compress Schemas when they are being sent over the wire.
-   * The compression results are memoized to reduce the compression time since the
-   * same schema is compressed many times over
-   */
-  def compress(schema: StructType): Array[Byte] = compressCache.getOrElseUpdate(schema, {
-    val bos = new ByteArrayOutputStream()
-    val out = codec.compressedOutputStream(bos)
-    Utils.tryWithSafeFinally {
-      out.write(schema.json.getBytes(StandardCharsets.UTF_8))
-    } {
-      out.close()
-    }
-    bos.toByteArray
-  })
-
-  /**
-   * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
-   * seen values so to limit the number of times that decompression has to be done.
-   */
-  def decompress(schemaBytes: ByteBuffer): StructType = decompressCache.getOrElseUpdate(schemaBytes, {
-    val bis = new ByteArrayInputStream(
-      schemaBytes.array(),
-      schemaBytes.arrayOffset() + schemaBytes.position(),
-      schemaBytes.remaining())
-    val in = codec.compressedInputStream(bis)
-    val bytes = Utils.tryWithSafeFinally {
-      IOUtils.toByteArray(in)
-    } {
-      in.close()
-    }
-    StructType.fromString(new String(bytes, StandardCharsets.UTF_8))
-  })
-
-  /**
-   * Serializes a record to the given output stream. It caches a lot of the internal data as
-   * to not redo work
-   */
-  def serializeDatum(datum: HoodieSparkRecord, output: Output): Unit = {
-    val schema = datum.getStructType
-    val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
-      SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8))
-    })
-    schemas.get(fingerprint) match {
-      case Some(_) =>
-        output.writeBoolean(true)
-        output.writeLong(fingerprint)
-      case None =>
-        output.writeBoolean(false)
-        val compressedSchema = compress(schema)
-        output.writeInt(compressedSchema.length)
-        output.writeBytes(compressedSchema)
-    }
-
-    val record = datum.newInstance().asInstanceOf[HoodieSparkRecord]
-    record.setStructType(null)
-    val stream = new ObjectOutputStream(output)
-    stream.writeObject(record)
-    stream.close()
-  }
-
-  /**
-   * Deserializes generic records into their in-memory form. There is internal
-   * state to keep a cache of already seen schemas and datum readers.
-   */
-  def deserializeDatum(input: Input): HoodieSparkRecord = {
-    val schema = {
-      if (input.readBoolean()) {
-        val fingerprint = input.readLong()
-        schemaCache.getOrElseUpdate(fingerprint, {
-          schemas.get(fingerprint) match {
-            case Some(s) => s
-            case None =>
-              throw new SparkException(
-                "Error reading attempting to read spark data -- encountered an unknown " +
-                  s"fingerprint: $fingerprint, not sure what schema to use.  This could happen " +
-                  "if you registered additional schemas after starting your spark context.")
-          }
-        })
-      } else {
-        val length = input.readInt()
-        decompress(ByteBuffer.wrap(input.readBytes(length)))
-      }
-    }
-    val stream = new ObjectInputStream(input)
-    val record = stream.readObject().asInstanceOf[HoodieSparkRecord]
-    stream.close()
-    record.setStructType(schema)
-
-    record
-  }
-
-  override def write(kryo: Kryo, output: Output, datum: HoodieSparkRecord): Unit =
-    serializeDatum(datum, output)
-
-  override def read(kryo: Kryo, input: Input, datumClass: Class[HoodieSparkRecord]): HoodieSparkRecord =
-    deserializeDatum(input)
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index c0a0307c3e..8ae4385500 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -474,7 +474,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     HoodieRecordMerger recordMerger = HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     int dedupParallelism = records.getNumPartitions() + 100;
     HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
-        HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+        (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+            .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
@@ -484,7 +485,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+    dedupedRecsRdd =
+        (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+            .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
     dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 9fb1862c5f..a4f723ff01 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -269,8 +269,8 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
     final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>();
     // queue memory limit
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0));
-    final long objSize = sizeEstimator.sizeEstimate(new Tuple2<>(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
+        getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
+    final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
     final long memoryLimitInBytes = 4 * objSize;
 
     // first let us throw exception from queueIterator reader and test that queueing thread
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2af3f8f3a1..9496a8083f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -115,7 +115,7 @@ public class HoodieAvroUtils {
    * TODO serialize other type of record.
    */
   public static Option<byte[]> recordToBytes(HoodieRecord record, Schema schema) throws IOException {
-    return Option.of(HoodieAvroUtils.indexedRecordToBytes(((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, new Properties()).get()).getData()));
+    return Option.of(HoodieAvroUtils.indexedRecordToBytes(record.toIndexedRecord(schema, new Properties()).get().getData()));
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 28265bbbf4..913bbcb97c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.model;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -75,7 +76,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt) {
     return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey((GenericRecord) data) : ((GenericRecord) data).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
   }
 
@@ -85,19 +86,20 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public String getRecordKey(String keyFieldName) {
+  public String getRecordKey(Schema recordSchema, String keyFieldName) {
     return Option.ofNullable(data.getSchema().getField(keyFieldName))
         .map(keyField -> data.get(keyField.pos()))
         .map(Object::toString).orElse(null);
   }
 
   @Override
-  public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+  public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other,
+      Schema targetSchema) throws IOException {
     GenericRecord record = HoodieAvroUtils.stitchRecords((GenericRecord) data, (GenericRecord) other.getData(), targetSchema);
     return new HoodieAvroIndexedRecord(record);
   }
@@ -115,8 +117,8 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
-    metadataValues.forEach((key, value) -> {
+  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
+    metadataValues.getKv().forEach((key, value) -> {
       if (value != null) {
         ((GenericRecord) data).put(key, value);
       }
@@ -126,18 +128,29 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public boolean isDelete(Schema schema, Properties props) {
+  public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+    ((GenericRecord) data).put(keyFieldName, StringUtils.EMPTY_STRING);
+    return this;
+  }
+
+  @Override
+  public boolean isDelete(Schema recordSchema, Properties props) {
     return false;
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+  public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
     return getData().equals(SENTINEL);
   }
 
+  @Override
+  public HoodieRecord<IndexedRecord> copy() {
+    return this;
+  }
+
   @Override
   public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
-      Schema schema,
+      Schema recordSchema,
       Properties props,
       Option<Pair<String, String>> simpleKeyGenFieldsOpt,
       Boolean withOperation,
@@ -149,7 +162,8 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+      Properties props, Option<BaseKeyGenerator> keyGen) {
     GenericRecord record = (GenericRecord) data;
     String key;
     String partition;
@@ -174,7 +188,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Properties props) {
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
     boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(props.getProperty(
         KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
         KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
@@ -184,7 +198,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) {
+  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) {
     return Option.of(this);
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 65f15ca6a4..dfd2b4ba33 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.model;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
@@ -74,17 +75,18 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Properties props) {
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
     return this.getData().getOrderingValue();
   }
 
   @Override
-  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public String getRecordKey(Schema recordSchema,
+      Option<BaseKeyGenerator> keyGeneratorOpt) {
     return getRecordKey();
   }
 
   @Override
-  public String getRecordKey(String keyFieldName) {
+  public String getRecordKey(Schema recordSchema, String keyFieldName) {
     return getRecordKey();
   }
 
@@ -94,12 +96,13 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
-    return HoodieAvroUtils.getRecordColumnValues(this, columns, recordSchema, consistentLogicalTimestampEnabled);
+  public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+    return new Object[]{HoodieAvroUtils.getRecordColumnValues(this, columns, recordSchema, consistentLogicalTimestampEnabled)};
   }
 
   @Override
-  public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other,
+      Schema targetSchema) throws IOException {
     throw new UnsupportedOperationException();
   }
 
@@ -119,10 +122,10 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
+  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
     GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
 
-    metadataValues.forEach((key, value) -> {
+    metadataValues.getKv().forEach((key, value) -> {
       if (value != null) {
         avroRecordPayload.put(key, value);
       }
@@ -132,13 +135,20 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public boolean isDelete(Schema schema, Properties props) throws IOException {
-    return !getData().getInsertValue(schema, props).isPresent();
+  public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException {
+    GenericRecord avroRecordPayload = (GenericRecord) getData().getInsertValue(recordSchema, props).get();
+    avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING);
+    return new HoodieAvroRecord<>(getKey(), new RewriteAvroPayload(avroRecordPayload), getOperation());
+  }
+
+  @Override
+  public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
+    return !getData().getInsertValue(recordSchema, props).isPresent();
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
-    Option<IndexedRecord> insertRecord = getData().getInsertValue(schema, props);
+  public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
+    Option<IndexedRecord> insertRecord = getData().getInsertValue(recordSchema, props);
     // just skip the ignored record
     if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) {
       return true;
@@ -147,21 +157,27 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
     }
   }
 
+  @Override
+  public HoodieRecord<T> copy() {
+    return this;
+  }
+
   @Override
   public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
-      Schema schema, Properties props,
+      Schema recordSchema, Properties props,
       Option<Pair<String, String>> simpleKeyGenFieldsOpt,
       Boolean withOperation,
       Option<String> partitionNameOp,
       Boolean populateMetaFields) throws IOException {
-    IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(schema, props).get();
+    IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get();
     String payloadClass = ConfigUtils.getPayloadClass(props);
     String preCombineField = ConfigUtils.getOrderingField(props);
     return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields);
   }
 
   @Override
-  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+      Properties props, Option<BaseKeyGenerator> keyGen) {
     throw new UnsupportedOperationException();
   }
 
@@ -170,8 +186,8 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException {
-    Option<IndexedRecord> avroData = getData().getInsertValue(schema, props);
+  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException {
+    Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema, props);
     if (avroData.isPresent()) {
       return Option.of(new HoodieAvroIndexedRecord(avroData.get()));
     } else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index 9fa70a3719..e57a18b592 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -23,8 +23,8 @@ import org.apache.avro.generic.IndexedRecord;
 
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 
 import java.io.IOException;
@@ -34,21 +34,28 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieAvroRecordMerger implements HoodieRecordMerger {
 
+  public static String DE_DUPING = "de_duping";
+
   @Override
   public String getMergingStrategy() {
-    return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+    return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
   }
 
   @Override
-  public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
     ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
     ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
-    if (older instanceof HoodieAvroRecord && newer instanceof HoodieAvroRecord) {
-      return Option.of(preCombine(older, newer));
-    } else if (older instanceof HoodieAvroIndexedRecord && newer instanceof HoodieAvroRecord) {
-      return combineAndGetUpdateValue(older, newer, schema, props);
+    boolean deDuping = Boolean.parseBoolean(props.getOrDefault(DE_DUPING, "false").toString());
+    if (deDuping) {
+      HoodieRecord res = preCombine(older, newer);
+      if (res == older) {
+        return Option.of(Pair.of(res, oldSchema));
+      } else {
+        return Option.of(Pair.of(res, newSchema));
+      }
     } else {
-      throw new UnsupportedOperationException();
+      return combineAndGetUpdateValue(older, newer, newSchema, props)
+          .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));
     }
   }
 
@@ -67,12 +74,19 @@ public class HoodieAvroRecordMerger implements HoodieRecordMerger {
   }
 
   private Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
-    Option<HoodieAvroIndexedRecord> previousRecordAvroPayload = older.toIndexedRecord(schema, props);
-    if (!previousRecordAvroPayload.isPresent()) {
+    Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData);
+    if (!previousAvroData.isPresent()) {
       return Option.empty();
     }
 
-    return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get().getData(), schema, props)
+    return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props)
         .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
   }
+
+  public static Properties withDeDuping(Properties props) {
+    Properties newProps = new Properties();
+    newProps.putAll(props);
+    newProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
+    return newProps;
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index f2167bb15e..74954a5e63 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -45,24 +45,13 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
     this.orderingVal = orderingVal;
   }
 
-  public HoodieEmptyRecord(HoodieRecord<T> record, HoodieRecordType type) {
-    super(record);
-    this.type = type;
-    this.orderingVal = record.getOrderingValue(new Properties());
-  }
-
-  public HoodieEmptyRecord(HoodieRecordType type) {
-    this.type = type;
-    this.orderingVal = null;
-  }
-
   @Override
   public T getData() {
     return null;
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Properties props) {
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
     return orderingVal;
   }
 
@@ -87,22 +76,24 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public String getRecordKey(Schema recordSchema,
+      Option<BaseKeyGenerator> keyGeneratorOpt) {
     return key.getRecordKey();
   }
 
   @Override
-  public String getRecordKey(String keyFieldName) {
+  public String getRecordKey(Schema recordSchema, String keyFieldName) {
     return key.getRecordKey();
   }
 
   @Override
-  public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+  public Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+  public HoodieRecord joinWith(HoodieRecord other,
+      Schema targetSchema) throws IOException {
     throw new UnsupportedOperationException();
   }
 
@@ -117,34 +108,44 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
+  public HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public boolean isDelete(Schema schema, Properties props) throws IOException {
+  public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isDelete(Schema recordSchema, Properties props) throws IOException {
     return true;
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+  public boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException {
     return false;
   }
 
   @Override
-  public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema schema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt, Boolean withOperation, Option<String> partitionNameOp,
-      Boolean populateMetaFieldsOp)
-      throws IOException {
+  public HoodieRecord<T> copy() {
+    return this;
+  }
+
+  @Override
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema recordSchema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation, Option<String> partitionNameOp, Boolean populateMetaFieldsOp) throws IOException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen) {
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema,
+      Properties props, Option<BaseKeyGenerator> keyGen) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException {
+  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException {
     return Option.empty();
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 87ed4d10f7..1acefe2204 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -136,7 +136,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
   /**
    * The cdc operation.
    */
-  private HoodieOperation operation;
+  protected HoodieOperation operation;
 
   public HoodieRecord(HoodieKey key, T data) {
     this(key, data, null);
@@ -175,7 +175,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
     return operation;
   }
 
-  public abstract Comparable<?> getOrderingValue(Properties props);
+  public abstract Comparable<?> getOrderingValue(Schema recordSchema, Properties props);
 
   public T getData() {
     if (data == null) {
@@ -202,10 +202,6 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
     return this;
   }
 
-  public void setData(T data) {
-    this.data = data;
-  }
-
   public HoodieRecordLocation getCurrentLocation() {
     return currentLocation;
   }
@@ -268,9 +264,9 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
 
   public abstract HoodieRecordType getRecordType();
 
-  public abstract String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt);
+  public abstract String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGeneratorOpt);
 
-  public abstract String getRecordKey(String keyFieldName);
+  public abstract String getRecordKey(Schema recordSchema, String keyFieldName);
 
   public void seal() {
     this.sealed = true;
@@ -288,13 +284,14 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
 
   /**
    * Get column in record to support RDDCustomColumnsSortPartitioner
+   * @return
    */
-  public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
+  public abstract Object[] getColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
 
   /**
    * Support bootstrap.
    */
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
+  public abstract HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
 
   /**
    * Rewrite record into new schema(add meta columns)
@@ -314,16 +311,19 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
    * This method could change in the future.
    * @temporary
    */
-  public abstract HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException;
+  public abstract HoodieRecord updateMetadataValues(Schema recordSchema, Properties props, MetadataValues metadataValues) throws IOException;
 
-  public abstract boolean isDelete(Schema schema, Properties props) throws IOException;
+  public abstract boolean isDelete(Schema recordSchema, Properties props) throws IOException;
 
   /**
    * Is EmptyRecord. Generated by ExpressionPayload.
    */
-  public abstract boolean shouldIgnore(Schema schema, Properties props) throws IOException;
+  public abstract boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException;
 
-  public abstract Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException;
+  /**
+   * This is used to copy data.
+   */
+  public abstract HoodieRecord<T> copy();
 
   public abstract Option<Map<String, String>> getMetadata();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
index c2eb164dca..540244009d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
@@ -31,7 +31,7 @@ public interface HoodieRecordCompatibilityInterface {
    * This method used to extract HoodieKey not through keyGenerator.
    */
   HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
-      Schema schema,
+      Schema recordSchema,
       Properties props,
       Option<Pair<String, String>> simpleKeyGenFieldsOpt,
       Boolean withOperation,
@@ -41,5 +41,12 @@ public interface HoodieRecordCompatibilityInterface {
   /**
    * This method used to extract HoodieKey through keyGenerator. This method used in ClusteringExecutionStrategy.
    */
-  HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Properties props, Option<BaseKeyGenerator> keyGen);
+  HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema recordSchema, Properties props, Option<BaseKeyGenerator> keyGen);
+
+  /**
+   * This method used to overwrite record key field.
+   */
+  HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, String keyFieldName) throws IOException;
+
+  Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Properties props) throws IOException;
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 43665b5711..4077752831 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
 
 import java.io.IOException;
@@ -37,12 +38,14 @@ import java.util.Properties;
 @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
 public interface HoodieRecordMerger extends Serializable {
 
+  String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+
   /**
    * This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
    * of the single record, both orders of operations applications have to yield the same result)
    */
-  Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+  Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException;
 
   /**
    * The record type handled by the current merger.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
new file mode 100644
index 0000000000..a4ff9a1896
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/MetadataValues.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hudi.common.util.ValidationUtils;
+
+public class MetadataValues {
+  private Map<String, String> kv;
+
+  public MetadataValues(Map<String, String> kv) {
+    ValidationUtils.checkArgument(HOODIE_META_COLUMNS_WITH_OPERATION.containsAll(kv.values()));
+    this.kv = kv;
+  }
+
+  public MetadataValues() {
+    this.kv = new HashMap<>();
+  }
+
+  public void setCommitTime(String value) {
+    this.kv.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, value);
+  }
+
+  public void setCommitSeqno(String value) {
+    this.kv.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, value);
+  }
+
+  public void setRecordKey(String value) {
+    this.kv.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, value);
+  }
+
+  public void setPartitionPath(String value) {
+    this.kv.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, value);
+  }
+
+  public void setFileName(String value) {
+    this.kv.put(HoodieRecord.FILENAME_METADATA_FIELD, value);
+  }
+
+  public void setOperation(String value) {
+    this.kv.put(HoodieRecord.OPERATION_METADATA_FIELD, value);
+  }
+
+  public Map<String, String> getKv() {
+    return kv;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index d21a46e680..013465ffa6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -174,8 +175,8 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
       .key("hoodie.compaction.merger.strategy")
-      .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
-      .withDocumentation("Id of merger strategy.  Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");
+      .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+      .withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.merger.impls which has the same merger strategy id");
 
   public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
       .key("hoodie.archivelog.folder")
@@ -259,7 +260,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
 
-  public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategy) {
+  public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategyId) {
     super();
     Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
     LOG.info("Loading table properties from " + propertyPath);
@@ -272,8 +273,8 @@ public class HoodieTableConfig extends HoodieConfig {
         needStore = true;
       }
       if (contains(MERGER_STRATEGY) && payloadClassName != null
-          && !getString(MERGER_STRATEGY).equals(mergerStrategy)) {
-        setValue(MERGER_STRATEGY, mergerStrategy);
+          && !getString(MERGER_STRATEGY).equals(mergerStrategyId)) {
+        setValue(MERGER_STRATEGY, mergerStrategyId);
         needStore = true;
       }
       if (needStore) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index c7d87a06b7..8a4901019c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -162,6 +163,8 @@ public abstract class AbstractHoodieLogRecordReader {
     HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
     this.preCombineField = tableConfig.getPreCombineField();
+    // Log scanner merge log with precombine
+    this.payloadProps.setProperty(HoodieAvroRecordMerger.DE_DUPING, "true");
     if (this.preCombineField != null) {
       this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 0ad1eea28c..42294c3d11 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -131,6 +131,10 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     return records;
   }
 
+  public HoodieRecordType getRecordType() {
+    return recordMerger.getRecordType();
+  }
+
   public long getNumMergedRecordsInLog() {
     return numMergedRecordsInLog;
   }
@@ -143,23 +147,22 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   }
 
   @Override
-  protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws IOException {
-    String key = hoodieRecord.getRecordKey();
+  protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
+    String key = newRecord.getRecordKey();
     if (records.containsKey(key)) {
       // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
       // done when a DELETE (empty payload) is encountered before or after an insert/update.
 
       HoodieRecord<T> oldRecord = records.get(key);
       T oldValue = oldRecord.getData();
-      T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord, hoodieRecord, readerSchema, this.getPayloadProps()).get()).getData();
+      HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema, newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
       // If combinedValue is oldValue, no need rePut oldRecord
-      if (combinedValue != oldValue) {
-        hoodieRecord.setData(combinedValue);
-        records.put(key, hoodieRecord);
+      if (combinedRecord.getData() != oldValue) {
+        records.put(key, combinedRecord.copy());
       }
     } else {
       // Put the record as is
-      records.put(key, hoodieRecord);
+      records.put(key, newRecord.copy());
     }
   }
 
@@ -172,8 +175,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
       // For same ordering values, uses the natural order(arrival time semantics).
 
-      Comparable curOrderingVal = oldRecord.getOrderingValue(
-          this.hoodieTableMetaClient.getTableConfig().getProps());
+      Comparable curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps());
       Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
       // Checks the ordering value does not equal to 0
       // because we use 0 as the default value which means natural order
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 272a3a8d8a..f93947ea0b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -55,7 +55,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
   @Override
   protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Exception {
     // Just call callback without merging
-    callback.apply(hoodieRecord);
+    callback.apply(hoodieRecord.copy());
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index ab669730f6..19ecdbf9ee 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -213,7 +213,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
   }
 
   protected Option<String> getRecordKey(HoodieRecord record) {
-    return Option.ofNullable(record.getRecordKey(keyFieldName));
+    return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 37db0caa9c..faa7856e43 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log.block;
 
+import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -42,7 +43,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
@@ -58,7 +58,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.TreeMap;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -216,7 +215,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     Option<Schema.Field> keyField = getKeyField(schema);
     // Reset key value w/in the record to avoid duplicating the key w/in payload
     if (keyField.isPresent()) {
-      record.updateValues(schema, new Properties(), Collections.singletonMap(keyField.get().name(), StringUtils.EMPTY_STRING));
+      record.truncateRecordKey(schema, new Properties(), keyField.get().name());
     }
     return HoodieAvroUtils.recordToBytes(record, schema).get();
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 60aa6c43a9..8f70495323 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -18,10 +18,15 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.table.HoodieTableConfig;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 public class ConfigUtils {
 
@@ -52,4 +57,11 @@ public class ConfigUtils {
     }
     return payloadClass;
   }
+
+  public static List<String> getMergerImpls(Map<String, String> optParams) {
+    return Arrays.stream(
+            optParams.getOrDefault("hoodie.datasource.write.merger.impls",
+                HoodieAvroRecordMerger.class.getName()).split(","))
+        .map(String::trim).distinct().collect(Collectors.toList());
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 92b14ff340..666a084877 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -66,7 +66,7 @@ public class HoodieRecordUtils {
   /**
    * Instantiate a given class with a record merge.
    */
-  public static HoodieRecordMerger generateRecordMerger(String basePath, EngineType engineType,
+  public static HoodieRecordMerger createRecordMerger(String basePath, EngineType engineType,
       List<String> mergerClassList, String mergerStrategy) {
     if (mergerClassList.isEmpty() || HoodieTableMetadata.isMetadataTable(basePath)) {
       return HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index ca8c3ab428..42ea766eb9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -24,7 +24,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.parquet.hadoop.ParquetReader;
 
 import java.io.IOException;
-import java.util.function.Function;
 
 /**
  * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in
@@ -36,24 +35,17 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
   private final ParquetReader<T> parquetReader;
   // Holds the next entry returned by the parquet reader
   private T next;
-  // For directly use InternalRow
-  private Function<T, T> mapper;
 
   public ParquetReaderIterator(ParquetReader<T> parquetReader) {
     this.parquetReader = parquetReader;
   }
 
-  public ParquetReaderIterator(ParquetReader<T> parquetReader, Function<T, T> mapper) {
-    this.parquetReader = parquetReader;
-    this.mapper = mapper;
-  }
-
   @Override
   public boolean hasNext() {
     try {
       // To handle when hasNext() is called multiple times for idempotency and/or the first time
       if (this.next == null) {
-        this.next = read();
+        this.next = parquetReader.read();
       }
       return this.next != null;
     } catch (Exception e) {
@@ -72,7 +64,7 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
         }
       }
       T retVal = this.next;
-      this.next = read();
+      this.next = null;
       return retVal;
     } catch (Exception e) {
       FileIOUtils.closeQuietly(parquetReader);
@@ -80,15 +72,6 @@ public class ParquetReaderIterator<T> implements ClosableIterator<T> {
     }
   }
 
-  private T read() throws IOException {
-    T record = parquetReader.read();
-    if (mapper == null || record == null) {
-      return record;
-    } else {
-      return mapper.apply(record);
-    }
-  }
-
   public void close() {
     try {
       parquetReader.close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 6f3b3ef928..9041db5144 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -19,11 +19,8 @@
 package org.apache.hudi.common.util;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.ByteArrayOutputStream;
@@ -39,14 +36,6 @@ public class SerializationUtils {
   private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
       ThreadLocal.withInitial(KryoSerializerInstance::new);
 
-  private static Pair<String, Serializer<?>> SERIALIZER_REGISTER = null;
-
-  public static void setOverallRegister(String className, Serializer<?> serializer) {
-    if (SERIALIZER_REGISTER == null) {
-      SERIALIZER_REGISTER = Pair.of(className, serializer);
-    }
-  }
-
   // Serialize
   // -----------------------------------------------------------------------
 
@@ -132,13 +121,6 @@ public class SerializationUtils {
       // Handle cases where we may have an odd classloader setup like with libjars
       // for hadoop
       kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
-      if (SERIALIZER_REGISTER != null) {
-        try {
-          kryo.register(Class.forName(SERIALIZER_REGISTER.getLeft()), SERIALIZER_REGISTER.getRight());
-        } catch (ClassNotFoundException e) {
-          throw new HoodieException(e);
-        }
-      }
       return kryo;
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 9499954911..a4f2c62437 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -33,8 +33,6 @@ public class StringUtils {
 
   public static final String EMPTY_STRING = "";
 
-  public static final String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
-
   /**
    * <p>
    * Joins the elements of the provided array into a single String containing the provided list of elements.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java
new file mode 100644
index 0000000000..e3bf9215f5
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatLists.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Space-efficient, comparable, immutable lists, copied from calcite core.
+ */
+public class FlatLists {
+  private FlatLists() {
+  }
+
+  /**
+   * Creates a memory-, CPU- and cache-efficient immutable list from an
+   * existing list. The list is always copied.
+   *
+   * @param t Array of members of list
+   * @param <T> Element type
+   * @return List containing the given members
+   */
+  public static <T> List<T> of(List<T> t) {
+    return of_(t);
+  }
+
+  public static <T extends Comparable> ComparableList<T> ofComparable(List<T> t) {
+    return of_(t);
+  }
+
+  public static <T extends Comparable> ComparableList<T> ofComparableArray(Object[] t) {
+    return ofComparable(Arrays.stream(t).map(v -> (T)v).collect(Collectors.toList()));
+  }
+
+  private static <T> ComparableList<T> of_(List<T> t) {
+    return new ComparableListImpl(new ArrayList<>(t));
+  }
+
+
+  /** List that is also comparable.
+   *
+   * <p>You can create an instance whose type
+   * parameter {@code T} does not extend {@link Comparable}, but you will get a
+   * {@link ClassCastException} at runtime when you call
+   * {@link #compareTo(Object)} if the elements of the list do not implement
+   * {@code Comparable}.
+   */
+  public interface ComparableList<T> extends List<T>, Comparable<List> {
+  }
+
+  /** Wrapper around a list that makes it implement the {@link Comparable}
+   * interface using lexical ordering. The elements must be comparable. */
+  static class ComparableListImpl<T extends Comparable<T>>
+      extends AbstractList<T>
+      implements ComparableList<T>, KryoSerializable {
+    private List<T> list;
+
+    protected ComparableListImpl(List<T> list) {
+      this.list = list;
+    }
+
+    public T get(int index) {
+      return list.get(index);
+    }
+
+    public int size() {
+      return list.size();
+    }
+
+    public int compareTo(List o) {
+      return compare(list, o);
+    }
+
+    static <T extends Comparable<T>> int compare(List<T> list0, List<T> list1) {
+      final int size0 = list0.size();
+      final int size1 = list1.size();
+      if (size1 == size0) {
+        return compare(list0, list1, size0);
+      }
+      final int c = compare(list0, list1, Math.min(size0, size1));
+      if (c != 0) {
+        return c;
+      }
+      return size0 - size1;
+    }
+
+    static <T extends Comparable<T>> int compare(List<T> list0, List<T> list1, int size) {
+      for (int i = 0; i < size; i++) {
+        Comparable o0 = list0.get(i);
+        Comparable o1 = list1.get(i);
+        int c = compare(o0, o1);
+        if (c != 0) {
+          return c;
+        }
+      }
+      return 0;
+    }
+
+    static <T extends Comparable<T>> int compare(T a, T b) {
+      if (a == b) {
+        return 0;
+      }
+      if (a == null) {
+        return -1;
+      }
+      if (b == null) {
+        return 1;
+      }
+      return a.compareTo(b);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output) {
+      kryo.writeClassAndObject(output, list);
+    }
+
+    @Override
+    public void read(Kryo kryo, Input input) {
+      list = (List<T>) kryo.readClassAndObject(input);
+    }
+  }
+
+}
+
+
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
index a194e2fc6a..3834574cd0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
@@ -26,17 +26,6 @@ import java.io.IOException;
 
 public class HoodieAvroFileReaderFactory extends HoodieFileReaderFactory {
 
-  private static class SingletonHolder {
-    private static final HoodieAvroFileReaderFactory INSTANCE = new HoodieAvroFileReaderFactory();
-  }
-
-  private HoodieAvroFileReaderFactory() {
-  }
-
-  public static HoodieFileReaderFactory getFileReaderFactory() {
-    return SingletonHolder.INSTANCE;
-  }
-
   protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
     return new HoodieAvroParquetReader(conf, path);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
index bb046c2395..e53a7b095f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriter.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 
@@ -42,13 +41,13 @@ public interface HoodieAvroFileWriter extends HoodieFileWriter {
 
   @Override
   default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException {
-    IndexedRecord avroPayload = ((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, props).get()).getData();
+    IndexedRecord avroPayload = record.toIndexedRecord(schema, props).get().getData();
     writeAvroWithMetadata(key, avroPayload);
   }
 
   @Override
   default void write(String recordKey, HoodieRecord record, Schema schema, Properties props) throws IOException {
-    IndexedRecord avroPayload = ((HoodieAvroIndexedRecord) record.toIndexedRecord(schema, props).get()).getData();
+    IndexedRecord avroPayload = record.toIndexedRecord(schema, props).get().getData();
     writeAvro(recordKey, avroPayload);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
index 0ab052a877..86d57fde73 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
@@ -44,17 +44,6 @@ import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
 
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
 
-  private static class SingletonHolder {
-    private static final HoodieAvroFileWriterFactory INSTANCE = new HoodieAvroFileWriterFactory();
-  }
-
-  private HoodieAvroFileWriterFactory() {
-  }
-
-  public static HoodieFileWriterFactory getFileReaderFactory() {
-    return HoodieAvroFileWriterFactory.SingletonHolder.INSTANCE;
-  }
-
   protected HoodieFileWriter newParquetFileWriter(
       String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index f84ec530ad..24164a146e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -28,8 +28,6 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -40,13 +38,12 @@ public class HoodieFileReaderFactory {
   public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return HoodieAvroFileReaderFactory.getFileReaderFactory();
+        return new HoodieAvroFileReaderFactory();
       case SPARK:
         try {
           Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
-          Method method = clazz.getMethod("getFileReaderFactory", null);
-          return (HoodieFileReaderFactory) method.invoke(null,null);
-        } catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
+          return (HoodieFileReaderFactory) clazz.newInstance();
+        } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) {
           throw new HoodieException("Unable to create hoodie spark file writer factory", e);
         }
       default:
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 99f35d7a0f..456383d374 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -48,13 +46,12 @@ public class HoodieFileWriterFactory {
   private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return HoodieAvroFileWriterFactory.getFileReaderFactory();
+        return new HoodieAvroFileWriterFactory();
       case SPARK:
         try {
           Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
-          Method method = clazz.getMethod("getFileWriterFactory", null);
-          return (HoodieFileWriterFactory) method.invoke(null, null);
-        } catch (NoSuchMethodException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+          return (HoodieFileWriterFactory) clazz.newInstance();
+        } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) {
           throw new HoodieException("Unable to create hoodie spark file writer factory", e);
         }
       default:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 42b0c40a83..e6ef39bf8c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -25,9 +25,9 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -129,8 +129,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> RECORD_MERGER_STRATEGY = ConfigOptions
       .key("record.merger.strategy")
       .stringType()
-      .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
-      .withDescription("Id of merger strategy.  Hudi will pick RecordMergers in record.merger.impls which has the same merger strategy id");
+      .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+      .withDescription("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in record.merger.impls which has the same merger strategy id");
 
   public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
       .key("partition.default_name")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index ba63b982f3..ed4f1b02e6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -433,7 +433,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
       Properties props = new Properties();
       config.addAllToProperties(props);
-      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
+      records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
+          .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
@@ -470,7 +471,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
               if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
                 Properties props = new Properties();
                 config.addAllToProperties(props);
-                records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
+                records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
+                    .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
               }
               bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 517e57be26..519fad1159 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.streamer;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.StringUtils;
@@ -124,8 +125,8 @@ public class FlinkStreamerConfig extends Configuration {
       + "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)")
   public String mergerImpls = HoodieAvroRecordMerger.class.getName();
 
-  @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick RecordMergers in merger-impls which has the same merger strategy id")
-  public String mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+  @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls which has the same merger strategy id")
+  public String mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
 
   @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
       + "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index b060505bb3..7abf304034 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -178,7 +178,7 @@ public class FormatUtils {
         .map(String::trim)
         .distinct()
         .collect(Collectors.toList());
-    HoodieRecordMerger merger = HoodieRecordUtils.generateRecordMerger(
+    HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger(
         split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
     FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
     return HoodieUnMergedLogRecordScanner.newBuilder()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 5364f6d96b..e9accb0a86 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.format.mor;
 
-import java.util.stream.Collectors;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -30,6 +29,7 @@ import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -65,6 +65,7 @@ import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.stream.Collectors;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -764,7 +765,7 @@ public class MergeOnReadInputFormat
           .map(String::trim)
           .distinct()
           .collect(Collectors.toList());
-      this.recordMerger = HoodieRecordUtils.generateRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
+      this.recordMerger = HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK, mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
     }
 
     @Override
@@ -854,7 +855,7 @@ public class MergeOnReadInputFormat
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
       HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(historyAvroRecord);
-      Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, record, tableSchema, payloadProps);
+      Option<HoodieRecord> resultRecord = recordMerger.merge(hoodieAvroIndexedRecord, tableSchema, record, tableSchema, payloadProps).map(Pair::getLeft);
       return resultRecord.get().toIndexedRecord(tableSchema, new Properties());
     }
   }
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index 7e571720c1..a9cf806b19 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -19,11 +19,11 @@
 package org.apache.hudi.hadoop;
 
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -160,7 +160,7 @@ public class TestInputPathHandler {
     properties.setProperty(HoodieTableConfig.NAME.key(), tableName);
     properties.setProperty(HoodieTableConfig.TYPE.key(), tableType.name());
     properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), HoodieAvroPayload.class.getName());
-    properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(), StringUtils.DEFAULT_MERGER_STRATEGY_UUID);
+    properties.setProperty(HoodieTableConfig.MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 051a109a26..04daafbab0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -21,8 +21,8 @@ package org.apache.hudi;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 
 import org.apache.avro.Schema;
@@ -34,11 +34,11 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
 
   @Override
   public String getMergingStrategy() {
-    return StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+    return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
   }
 
   @Override
-  public Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, Properties props) throws IOException {
     ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK);
     ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK);
 
@@ -48,12 +48,12 @@ public class HoodieSparkRecordMerger implements HoodieRecordMerger {
     }
     if (older.getData() == null) {
       // use natural order for delete record
-      return Option.of(newer);
+      return Option.of(Pair.of(newer, newSchema));
     }
-    if (older.getOrderingValue(props).compareTo(newer.getOrderingValue(props)) > 0) {
-      return Option.of(older);
+    if (older.getOrderingValue(oldSchema, props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+      return Option.of(Pair.of(older, oldSchema));
     } else {
-      return Option.of(newer);
+      return Option.of(Pair.of(newer, newSchema));
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ece56a5159..53380c9aa6 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -79,7 +79,7 @@ case class HoodieTableState(tablePath: String,
                             usesVirtualKeys: Boolean,
                             recordPayloadClassName: String,
                             metadataConfig: HoodieMetadataConfig,
-                            mergerImpls: String,
+                            mergerImpls: List[String],
                             mergerStrategy: String)
 
 /**
@@ -461,6 +461,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   }
 
   protected def getTableState: HoodieTableState = {
+    val mergerImpls = ConfigUtils.getMergerImpls(optParams.asJava).asScala.toList
+    val mergerStrategy = optParams.getOrElse(HoodieWriteConfig.MERGER_STRATEGY.key(),
+      sqlContext.getConf(HoodieWriteConfig.MERGER_STRATEGY.key(), HoodieWriteConfig.MERGER_STRATEGY.defaultValue()))
+
     // Subset of the state of table's configuration as of at the time of the query
     HoodieTableState(
       tablePath = basePath,
@@ -470,10 +474,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
       recordPayloadClassName = tableConfig.getPayloadClass,
       metadataConfig = fileIndex.metadataConfig,
-      mergerImpls = optParams.getOrElse(HoodieWriteConfig.MERGER_IMPLS.key(),
-        HoodieWriteConfig.MERGER_IMPLS.defaultValue()),
-      mergerStrategy = optParams.getOrElse(HoodieWriteConfig.MERGER_STRATEGY.key(),
-        metaClient.getTableConfig.getMergerStrategy)
+      mergerImpls = mergerImpls,
+      mergerStrategy = mergerStrategy
     )
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f18ef4fc13..1b8cb57936 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -295,8 +295,7 @@ object HoodieSparkSqlWriter {
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
-              writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
             }
             // Create a HoodieWriteClient & issue the write.
@@ -837,7 +836,7 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  private def createHoodieRecordRdd(df: DataFrame, config: HoodieWriteConfig, parameters: Map[String, String], schema: Schema): JavaRDD[HoodieRecord[_]] = {
+  private def createHoodieRecordRdd(df: DataFrame, config: HoodieWriteConfig, parameters: Map[String, String], schema: Schema) = {
     val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
     val tblName = config.getString(HoodieWriteConfig.TBL_NAME)
     val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
@@ -849,7 +848,9 @@ object HoodieSparkSqlWriter {
     val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps))
     val partitionCols = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
     val dropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
-    config.getRecordMerger.getRecordType match {
+    val recordType = config.getRecordMerger.getRecordType
+    log.debug(s"Use $recordType")
+    recordType match {
       case HoodieRecord.HoodieRecordType.AVRO =>
         val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
           org.apache.hudi.common.util.Option.of(schema))
@@ -873,15 +874,12 @@ object HoodieSparkSqlWriter {
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
         df.queryExecution.toRdd.map(row => {
           val internalRow = row.copy()
-          val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structTypeBC.value)
-          val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structTypeBC.value)
-          val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structTypeBC.value)
+          val (processedRow, writeSchema) = getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, structType)
+          val recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType)
+          val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType)
           val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-          HoodieInternalRowUtils.addCompressedSchema(structTypeBC.value)
 
           new HoodieSparkRecord(key, processedRow, writeSchema)
         }).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 87758ebd3c..f5a3834304 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -21,7 +21,7 @@ package org.apache.hudi
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection}
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.commmon.model.HoodieSparkRecord
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
@@ -32,22 +32,20 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
-import org.apache.hudi.common.util.{HoodieRecordUtils, SerializationUtils}
+import org.apache.hudi.common.util.HoodieRecordUtils
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
 import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-import org.apache.avro.{Schema, SchemaNormalization}
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.hudi.SparkStructTypeSerializer
 import org.apache.spark.sql.HoodieCatalystExpressionUtils
 import org.apache.spark.sql.types.StructType
 import java.io.Closeable
-import java.nio.charset.StandardCharsets
 import java.util.Properties
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import scala.annotation.tailrec
@@ -201,9 +199,7 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
-  private val mergerList = tableState.mergerImpls.split(",")
-    .map(_.trim).distinct.toList.asJava
-  private val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+  private val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.mergerImpls.asJava, tableState.mergerStrategy)
 
   override def hasNext: Boolean = hasNextInternal
 
@@ -243,15 +239,18 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
     recordMerger.getRecordType match {
       case HoodieRecordType.SPARK =>
         val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
-        toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
+        val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
+        toScalaOption(result)
           .map(r => {
-            val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(r.asInstanceOf[HoodieSparkRecord].getStructType, structTypeSchema)
-            projection.apply(r.getData.asInstanceOf[InternalRow])
+            val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
+            val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
+            projection.apply(r.getLeft.getData.asInstanceOf[InternalRow])
           })
       case _ =>
         val curRecord = new HoodieAvroIndexedRecord(serialize(curRow))
-        toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
-        .map(r => deserialize(projectAvroUnsafe(r.toIndexedRecord(logFileReaderAvroSchema, new Properties()).get().getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder)))
+        val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps)
+        toScalaOption(result)
+          .map(r => deserialize(projectAvroUnsafe(r.getLeft.toIndexedRecord(r.getRight, payloadProps).get.getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder)))
     }
   }
 }
@@ -321,15 +320,9 @@ object LogFileIterator {
           getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
       }
 
-      val mergerList = tableState.mergerImpls.split(",")
-        .map(_.trim).distinct.toList.asJava
-      val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+      val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.mergerImpls.asJava, tableState.mergerStrategy)
       logRecordScannerBuilder.withRecordMerger(recordMerger)
 
-      if (recordMerger.getRecordType == HoodieRecordType.SPARK) {
-        registerStructTypeSerializerIfNeed(List(HoodieInternalRowUtils.getCachedSchema(logSchema)))
-      }
-
       logRecordScannerBuilder.build()
     }
   }
@@ -348,11 +341,4 @@ object LogFileIterator {
       .getOrElse(split.logFiles.head.getPath)
       .getParent
   }
-
-  private def registerStructTypeSerializerIfNeed(schemas: List[StructType]): Unit = {
-    val schemaMap = schemas.map(schema => (SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8)), schema))
-      .toMap
-    val serializer = new SparkStructTypeSerializer(schemaMap)
-    SerializationUtils.setOverallRegister(classOf[HoodieSparkRecord].getName, serializer)
-  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index ad60f6187b..7c996b10ce 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -24,13 +24,12 @@ import org.apache.hudi.HoodieConversionUtils._
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -162,8 +161,8 @@ class HoodieCDCRDD(
         metaClient.getTableConfig.getPayloadClass,
         metadataConfig,
         // TODO support CDC with spark record
-        mergerImpls = classOf[HoodieAvroRecordMerger].getName,
-        mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID
+        mergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
+        mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
       )
     }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 8cd77900c0..5fdaf81dd2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecordMerger}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
 import org.apache.spark.sql.SparkSession
@@ -70,17 +70,11 @@ object HoodieOptionConfig {
     .defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
     .build()
 
-  val SQL_MERGER_IMPLS: HoodieSQLOption[String] = buildConf()
-    .withSqlKey("mergerImpls")
-    .withHoodieKey(DataSourceWriteOptions.MERGER_IMPLS.key)
-    .defaultValue(classOf[HoodieAvroRecordMerger].getName)
-    .build()
-
   val SQL_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
     .withSqlKey("mergerStrategy")
     .withHoodieKey(DataSourceWriteOptions.MERGER_STRATEGY.key)
     .withTableConfigKey(HoodieTableConfig.MERGER_STRATEGY.key)
-    .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+    .defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
     .build()
 
   /**
@@ -199,7 +193,7 @@ object HoodieOptionConfig {
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
     val sqlOptions = mappingTableConfigToSqlOption(options)
-    val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_IMPLS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
+    val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
     sqlOptions.filterKeys(targetOptions.contains)
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 332455ea21..7ef0cbf51e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -66,6 +66,7 @@ trait ProvidesHoodieConfig extends Logging {
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         TBL_NAME.key -> hoodieCatalogTable.tableName,
         PRECOMBINE_FIELD.key -> preCombineField,
+        MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, HoodieWriteConfig.MERGER_IMPLS.defaultValue),
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
@@ -185,6 +186,7 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
+        MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.MERGER_IMPLS.key, HoodieWriteConfig.MERGER_IMPLS.defaultValue),
         ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index cad5499b35..371a2d7b41 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -41,8 +41,6 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.SparkSession;
@@ -96,13 +94,12 @@ public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBoots
       } else if (recordType == HoodieRecordType.SPARK) {
         SparkKeyGeneratorInterface sparkKeyGenerator = (SparkKeyGeneratorInterface) keyGenerator;
         StructType structType = inputDataset.schema();
-        Broadcast<StructType> structTypeBC = new JavaSparkContext(sparkSession.sparkContext()).broadcast(structType);
         return inputDataset.queryExecution().toRdd().toJavaRDD().map(row -> {
           InternalRow internalRow = row.copy();
-          String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structTypeBC.value()).toString();
-          String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structTypeBC.value()).toString();
+          String recordKey = sparkKeyGenerator.getRecordKey(internalRow, structType).toString();
+          String partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
           HoodieKey key = new HoodieKey(recordKey, partitionPath);
-          return new HoodieSparkRecord(key, internalRow, structTypeBC.value());
+          return new HoodieSparkRecord(key, internalRow, structType);
         });
       } else {
         throw new UnsupportedOperationException(recordType.name());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
index 624033b67f..3fabdad4a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
@@ -64,18 +64,6 @@ class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAf
     sparkSession.close()
   }
 
-  test("test merge") {
-    val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
-    val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181)))
-    val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first()
-    val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first()
-    val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge)
-    assert(rowMerge.get(0, StringType).toString.equals("like"))
-    assert(rowMerge.get(1, IntegerType) == 18)
-    assert(rowMerge.get(2, StringType).toString.equals("like1"))
-    assert(rowMerge.get(3, IntegerType) == 181)
-  }
-
   test("test rewrite") {
     val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
     val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 37d320e192..03dcd2db4d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -36,7 +36,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.testutils.HoodieClientTestBase
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, HoodieSparkRecordMerger}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieInternalRowUtils, HoodieSparkRecordMerger, QuickstartUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, concat, lit, udf}
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index ed4eb373b5..be865476d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -23,8 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecordPayload, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -36,8 +35,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
 import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger, SparkDatasetMixin}
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieInternalRowUtils, HoodieSparkRecordMerger, SparkDatasetMixin}
 import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
@@ -98,9 +96,12 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
     )
 
   @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
-  def testCount(recordType: HoodieRecordType) {
-    val (writeOpts, readOpts) = getOpts(recordType)
+  @CsvSource(Array("AVRO, AVRO, avro", "AVRO, SPARK, parquet", "SPARK, AVRO, parquet", "SPARK, SPARK, parquet"))
+  def testCount(readType: HoodieRecordType, writeType: HoodieRecordType, logType: String) {
+    var (_, readOpts) = getOpts(readType)
+    var (writeOpts, _) = getOpts(writeType)
+    readOpts = readOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
+    writeOpts = writeOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
 
     // First Operation:
     // Producing parquet files to three default partitions.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
index ebdb943ed8..3547e42148 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/ReadAndWriteWithoutAvroBenchmark.scala
@@ -19,9 +19,10 @@
 package org.apache.spark.sql.execution.benchmark
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.config.HoodieCompactionConfig
 import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
 
 import org.apache.spark.SparkConf
@@ -60,41 +61,53 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
     sparkConf
   }
 
-  private def createComplexDataFrame(rowNum: Long, colNum: Int): DataFrame = {
+  private def createComplexDataFrame(rowNum: Long): DataFrame = {
     var df = spark.range(0, rowNum).toDF("id")
       .withColumn("t1", lit(1))
       .withColumn("d1", lit(12.99d))
       .withColumn("s1", lit("s1"))
       .withColumn("s2", lit("s2"))
       .withColumn("s3", lit("s3"))
-    for (i <- 0 to colNum) {
+    for (i <- 0 to 1) {
       df = df.withColumn(s"struct$i", struct(col("s1").as("st1"), col("s2").as("st2"), col("s3").as("st3")))
         .withColumn(s"map$i", map(col("s1"), col("s2")))
-        .withColumn(s"array$i", split(col("s1"), " "))
+        .withColumn(s"array$i", array(col("s1")))
     }
     df
   }
 
-  private def prepareHoodieTable(tableName: String, path: String, tableType: String, mergerType: String, df: DataFrame): Unit = {
+  private def prepareHoodieTable(tableName: String, path: String, tableType: String, mergerImpl: String, df: DataFrame): Unit = {
     df.collect()
     df.createOrReplaceTempView("input_df")
     if (spark.catalog.tableExists(tableName)) {
       spark.sql(s"drop table if exists $tableName")
     }
+    spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
     spark.sql(
       s"""
-         |create table $tableName using hudi
+         |create table $tableName(
+         |id long,
+         |t1 int,
+         |d1 double,
+         |s1 string,
+         |s2 string,
+         |s3 string,
+         |struct0 struct<st1:string, st2:string, st3:string>,
+         |map0 map<string, string>,
+         |array0 array<string>,
+         |struct1 struct<st1:string, st2:string, st3:string>,
+         |map1 map<string, string>,
+         |array1 array<string>
+         |) using hudi
          |tblproperties(
          |  primaryKey = 'id',
          |  preCombineField = 's1',
          |  type = '$tableType',
-         |  ${HoodieWriteConfig.MERGER_IMPLS.key} = '$mergerType',
          |  ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 'parquet',
          |  ${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()} = '10')
          |location '$path'
-         |As
-         |select * from input_df
    """.stripMargin)
+    spark.sql(s"insert overwrite table $tableName select * from input_df")
   }
 
   /**
@@ -106,7 +119,7 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
    *  org.apache.hudi.HoodieSparkRecordMerger                      12654          13924        1100          0.1       12653.8       1.3X
    */
   private def overwriteBenchmark(): Unit = {
-    val df = createComplexDataFrame(1000000, 1)
+    val df = createComplexDataFrame(1000000)
     val benchmark = new HoodieBenchmark("pref insert overwrite", 1000000, 3)
     Seq(classOf[HoodieAvroRecordMerger].getName, classOf[HoodieSparkRecordMerger].getName).zip(Seq(avroTable, sparkTable)).foreach {
       case (merger, tableName) => benchmark.addCase(merger) { _ =>
@@ -118,26 +131,42 @@ object ReadAndWriteWithoutAvroBenchmark extends HoodieBenchmarkBase {
     benchmark.run()
   }
 
+  /**
+   * Java HotSpot(TM) 64-Bit Server VM 1.8.0_211-b12 on Mac OS X 10.16
+   * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+   * pref upsert:                                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * -----------------------------------------------------------------------------------------------------------------------------------
+   * org.apache.hudi.common.model.HoodieAvroRecordMerger           6108           6383         257          0.0      610785.6       1.0X
+   * org.apache.hudi.HoodieSparkRecordMerger                       4833           5468         614          0.0      483300.0       1.3X
+   *
+   * Java HotSpot(TM) 64-Bit Server VM 1.8.0_211-b12 on Mac OS X 10.16
+   * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+   * pref read:                                           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * -----------------------------------------------------------------------------------------------------------------------------------
+   * org.apache.hudi.common.model.HoodieAvroRecordMerger            813            818           8          0.0       81302.1       1.0X
+   * org.apache.hudi.HoodieSparkRecordMerger                        604            616          18          0.0       60430.1       1.3X
+   */
   private def upsertThenReadBenchmark(): Unit = {
-    val avroMerger = classOf[HoodieAvroRecordMerger].getName
-    val sparkMerger = classOf[HoodieSparkRecordMerger].getName
-    val df = createComplexDataFrame(1000000, 1)
+    val avroMergerImpl = classOf[HoodieAvroRecordMerger].getName
+    val sparkMergerImpl = classOf[HoodieSparkRecordMerger].getName
+    val df = createComplexDataFrame(10000)
     withTempDir { avroPath =>
       withTempDir { sparkPath =>
-        val upsertBenchmark = new HoodieBenchmark("pref upsert", 1000000, 3)
-        prepareHoodieTable(avroTable, new Path(avroPath.getCanonicalPath, avroTable).toUri.toString, "mor", avroMerger, df)
-        prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath, sparkTable).toUri.toString, "mor", sparkMerger, df)
-        df.createOrReplaceTempView("input_df")
-        Seq(avroMerger, sparkMerger).zip(Seq(avroTable, sparkTable)).foreach {
-          case (merger, tableName) => upsertBenchmark.addCase(merger) { _ =>
-            spark.sql(s"update $tableName set s1 = 's1_new' where id > 0")
+        val upsertBenchmark = new HoodieBenchmark("pref upsert", 10000, 3)
+        prepareHoodieTable(avroTable, new Path(avroPath.getCanonicalPath, avroTable).toUri.toString, "mor", avroMergerImpl, df)
+        prepareHoodieTable(sparkTable, new Path(sparkPath.getCanonicalPath, sparkTable).toUri.toString, "mor", sparkMergerImpl, df)
+        Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, sparkTable)).foreach {
+          case (mergerImpl, tableName) => upsertBenchmark.addCase(mergerImpl) { _ =>
+            spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
+            spark.sql(s"update $tableName set s1 = 's1_new_1' where id > 0")
           }
         }
         upsertBenchmark.run()
 
-        val readBenchmark = new HoodieBenchmark("pref read", 1000000, 3)
-        Seq(avroMerger, sparkMerger).zip(Seq(avroTable, sparkTable)).foreach {
-          case (merger, tableName) => readBenchmark.addCase(merger) { _ =>
+        val readBenchmark = new HoodieBenchmark("pref read", 10000, 3)
+        Seq(avroMergerImpl, sparkMergerImpl).zip(Seq(avroTable, sparkTable)).foreach {
+          case (mergerImpl, tableName) => readBenchmark.addCase(mergerImpl) { _ =>
+            spark.sql(s"set ${HoodieWriteConfig.MERGER_IMPLS.key} = $mergerImpl")
             spark.sql(s"select * from $tableName").collect()
           }
         }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 2cfa8be36f..ef84eb2c89 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieAvroRecordMerger, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieAvroRecordMerger, HoodieRecordMerger, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
@@ -36,14 +36,13 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
     assertTrue(with1("primaryKey") == "id")
     assertTrue(with1("type") == "cow")
     assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName)
-    assertTrue(with1("mergerStrategy") == StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+    assertTrue(with1("mergerStrategy") == HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
 
     val ops2 = Map("primaryKey" -> "id",
       "preCombineField" -> "timestamp",
       "type" -> "mor",
       "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
-      "mergerImpls" -> classOf[HoodieAvroRecordMerger].getName,
-      "mergerStrategy" -> StringUtils.DEFAULT_MERGER_STRATEGY_UUID
+      "mergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
     assertTrue(ops2 == with2)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 39be0a8407..deeddd96df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -280,8 +281,8 @@ public class HoodieDeltaStreamer implements Serializable {
         + "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)")
     public String mergerImpls = HoodieAvroRecordMerger.class.getName();
 
-    @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick RecordMergers in merger-impls which has the same merger strategy id")
-    public String mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID;
+    @Parameter(names = {"--merger-strategy"}, description = "Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in merger-impls which has the same merger strategy id")
+    public String mergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
 
     @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema"
         + ".SchemaProvider to attach schemas to input & target table data, built in options: "
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index 192bdbf8c6..c5f611ee53 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -217,13 +217,13 @@ class HoodieRecord {
    /**
     * Get column in record to support RDDCustomColumnsSortPartitioner
     */
-   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+   ComparableList getComparableColumnValues(Schema recordSchema, String[] columns,
            boolean consistentLogicalTimestampEnabled);
 
    /**
     * Support bootstrap.
     */
-   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
+   HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;
 
    /**
     * Rewrite record into new schema(add meta columns)