You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/10/06 13:08:01 UTC

[hudi] 05/07: [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b2f5723ec090774687fad8248ba994ae258a6fed
Author: wulei <wu...@bytedance.com>
AuthorDate: Tue Jul 5 02:09:12 2022 +0800

    [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific  HoodieRecord (#5627)
    
    Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
 .../apache/hudi/config/HoodieCompactionConfig.java |  12 +
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   5 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   4 +
 .../hudi/table/action/commit/BaseWriteHelper.java  |   7 +-
 .../table/action/commit/HoodieWriteHelper.java     |  11 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   |  13 +-
 .../hudi/execution/FlinkLazyInsertIterable.java    |   2 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   2 -
 .../hudi/table/action/commit/FlinkMergeHelper.java |   2 +-
 .../hudi/table/action/commit/FlinkWriteHelper.java |   6 +-
 .../hudi/table/action/commit/JavaWriteHelper.java  |   7 +-
 .../org/apache/hudi/client/HoodieReadClient.java   |   1 -
 .../org/apache/hudi/client/SparkRDDReadClient.java |   3 +-
 .../bulkinsert/RDDBucketIndexPartitioner.java      |   3 +-
 .../bulkinsert/RDDConsistentBucketPartitioner.java |   8 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |   1 -
 .../commit/BulkInsertDataInternalWriterHelper.java |   2 +-
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  21 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   7 +-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |   1 -
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  16 +-
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  17 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |  49 +---
 .../hudi/common/model/HoodieAvroRecordMerge.java   |  57 +++++
 .../org/apache/hudi/common/model/HoodieMerge.java  |  23 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |  36 +--
 .../hudi/common/table/HoodieTableConfig.java       |  15 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |  10 +
 .../hudi/common/table/TableSchemaResolver.java     |   3 +-
 .../table/log/AbstractHoodieLogRecordReader.java   |   7 +
 .../table/log/HoodieMergedLogRecordScanner.java    |   7 +-
 .../table/log/block/HoodieHFileDataBlock.java      |   1 +
 .../table/log/block/HoodieParquetDataBlock.java    |   2 +-
 .../apache/hudi/common/util/HoodieRecordUtils.java |  68 +++++
 .../apache/hudi/common/util/ReflectionUtils.java   |  13 -
 .../apache/hudi/common/util/SpillableMapUtils.java |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  41 +--
 .../hudi/common/util/HoodieRecordUtilsTest.java    |  47 ++++
 .../apache/hudi/configuration/FlinkOptions.java    |   8 +
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  15 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   6 +
 .../org/apache/hudi/table/HoodieTableSource.java   |   3 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  20 +-
 .../table/format/mor/MergeOnReadTableState.java    |   9 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   2 +
 .../apache/hudi/source/TestStreamReadOperator.java |   3 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |   6 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   1 -
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   6 +
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   9 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   2 +
 .../spark/sql/hudi/HoodieInternalRowUtils.scala    | 281 +++++++++++++++++++++
 .../apache/spark/sql/hudi/HoodieSparkRecord.java   | 190 ++++++++++++++
 .../spark/sql/hudi/HoodieSparkRecordMerge.java     |  37 ++-
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   |   2 +-
 .../apache/hudi/TestHoodieInternalRowUtils.scala   | 114 +++++++++
 .../hudi/TestStructTypeSchemaEvolutionUtils.scala  | 222 ++++++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +
 .../deltastreamer/HoodieDeltaStreamer.java         |   5 +
 60 files changed, 1279 insertions(+), 209 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index b6fe6d8aa0..cd9871cd4b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -123,6 +124,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "compaction during each compaction run. By default. Hudi picks the log file "
           + "with most accumulated unmerged data");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.compaction.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
       .defaultValue("true")
@@ -356,6 +363,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withMergeClass(String mergeClass) {
+      compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
+      return this;
+    }
+
     public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
       compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3b3995c8f8..1087239612 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -126,6 +127,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.datasource.write.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty
       .key("hoodie.datasource.write.keygenerator.class")
       .noDefaultValue()
@@ -1338,6 +1345,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
   }
 
+  public String getMergeClass() {
+    return getString(HoodieCompactionConfig.MERGE_CLASS_NAME);
+  }
+
   public int getTargetPartitionsPerDayBasedCompaction() {
     return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 543c51a805..10dace3355 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -342,10 +342,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
       // writing the first record. So make a copy of the record to be merged
       HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
       try {
-        Option<HoodieRecord> combinedRecord =
-            hoodieRecord.combineAndGetUpdateValue(oldRecord,
-                schema,
-                props);
+        Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props);
 
         if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) {
           // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 782ce0b7f2..bdaf201c7b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -29,7 +29,9 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -63,6 +65,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
    */
   protected final Schema tableSchema;
   protected final Schema tableSchemaWithMetaFields;
+  protected final HoodieMerge merge;
 
   /**
    * The write schema. In most case the write schema is the same to the
@@ -107,6 +110,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I,
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass());
   }
 
   /**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 984deda36d..8b2aed1bd6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -81,9 +83,10 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
    */
   public I deduplicateRecords(
       I records, HoodieTable<T, I, K, O> table, int parallelism) {
-    return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema());
+    HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass());
+    return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), merge);
   }
 
   public abstract I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema);
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, HoodieMerge merge);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 201c89b192..09922f1bcc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
@@ -31,7 +32,6 @@ import org.apache.hudi.table.HoodieTable;
 
 public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
     HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
-
   private HoodieWriteHelper() {
   }
 
@@ -51,7 +51,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
 
   @Override
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
-      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
+      HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, HoodieMerge merge) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
     // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
@@ -64,10 +64,9 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
       return Pair.of(key, record);
     }).reduceByKey((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
-      HoodieRecord reducedRec = rec2.preCombine(rec1, schema.get(), CollectionUtils.emptyProps());
-      HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey();
-
-      return (HoodieRecord<T>) reducedRec.newInstance(reducedKey);
+      HoodieRecord<T> reducedRecord =  merge.preCombine(rec1, rec2, , schema.get(), CollectionUtils.emptyProps());
+      HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
+      return reducedRecord.newInstance(reducedKey);
     }, reduceParallelism).map(Pair::getRight);
   }
 
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index 522bf69134..92fd5327bf 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -19,6 +19,12 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -43,13 +49,6 @@ import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.orc.CompressionKind;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 94bffc64cd..5ac00bd805 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -40,7 +40,7 @@ import java.util.List;
  *
  * @param <T> type of the payload
  */
-public class FlinkLazyInsertIterable<T extends HoodieLazyInsertIterable<T> {
+public class FlinkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
 
   public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
                                  boolean areRecordsSorted,
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 2730b992ce..01f35e895e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -39,8 +39,6 @@ import org.apache.avro.specific.SpecificRecordBase;
 
 import java.util.List;
 
-import static org.apache.hudi.common.data.HoodieList.getList;
-
 /**
  * Impl of a flink hoodie table.
  */
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index ea5e139785..7d778008ec 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -31,7 +31,6 @@ import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
-import scala.collection.immutable.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
@@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Flink merge helper.
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index cfccf8584f..4bbb148c23 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -89,7 +90,7 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, HoodieMerge merge) {
     // If index used is global, then records are expected to differ in their partitionPath
     Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
         .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()));
@@ -97,7 +98,8 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord
     // caution that the avro schema is not serializable
     final Schema schema = new Schema.Parser().parse(schemaStr);
     return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> {
-      @SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1);
+      @SuppressWarnings("unchecked")
+      final HoodieRecord reducedRec =  merge.preCombine(rec1, rec2);
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 977dd4c690..798309a357 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.index.HoodieIndex;
@@ -56,7 +57,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
-      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
+      List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, HoodieMerge merge) {
     boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
       HoodieKey hoodieKey = record.getKey();
@@ -68,11 +69,11 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
     final Schema schema = new Schema.Parser().parse(schemaStr);
     return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
-      HoodieRecord reducedRec = rec2.preCombine(rec1, schema, CollectionUtils.emptyProps());
+      HoodieRecord<T> reducedRecord =  merge.preCombine(rec1, rec2, schema, CollectionUtils.emptyProps());
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
-      return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey());
+      return reducedRecord.newInstance(rec1.getKey());
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 02f86cdc3e..2ada056099 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
index adddabfdc0..d173d3d1a2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
@@ -59,7 +58,7 @@ import scala.Tuple2;
 /**
  * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
  */
-public class SparkRDDReadClient<T extends HoodieRecordPayload<T>> implements Serializable {
+public class SparkRDDReadClient<T> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index dc5317f7d8..6c490f5beb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -28,6 +27,6 @@ import org.apache.spark.api.java.JavaRDD;
  * Abstract of bucket index bulk_insert partitioner
  * TODO implement partitioner for SIMPLE BUCKET INDEX
  */
-public abstract class RDDBucketIndexPartitioner<T extends HoodieRecordPayload>
+public abstract class RDDBucketIndexPartitioner<T>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
index e23723ac72..c9116515e9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java
@@ -22,10 +22,10 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -59,7 +59,7 @@ import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
 /**
  * A partitioner for (consistent hashing) bucket index used in bulk_insert
  */
-public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> extends RDDBucketIndexPartitioner<T> {
+public class RDDConsistentBucketPartitioner<T> extends RDDBucketIndexPartitioner<T> {
 
   private static final Logger LOG = LogManager.getLogger(RDDConsistentBucketPartitioner.class);
 
@@ -235,8 +235,8 @@ public class RDDConsistentBucketPartitioner<T extends HoodieRecordPayload> exten
     final String[] sortColumns = sortColumnNames;
     final SerializableSchema schema = new SerializableSchema(HoodieAvroUtils.addMetadataFields((new Schema.Parser().parse(table.getConfig().getSchema()))));
     Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & Serializable) (t1, t2) -> {
-      Object obj1 = HoodieAvroUtils.getRecordColumnValues(t1, sortColumns, schema, consistentLogicalTimestampEnabled);
-      Object obj2 = HoodieAvroUtils.getRecordColumnValues(t2, sortColumns, schema, consistentLogicalTimestampEnabled);
+      Object obj1 = HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord) t1, sortColumns, schema, consistentLogicalTimestampEnabled);
+      Object obj2 = HoodieAvroUtils.getRecordColumnValues((HoodieAvroRecord)t2, sortColumns, schema, consistentLogicalTimestampEnabled);
       return ((Comparable) obj1).compareTo(obj2);
     };
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 12c444d850..9da04f7260 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 009123b4a6..12e9dda81a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index f2d6f0381a..fa691116ee 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql
 
 import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Like, Literal, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
@@ -108,6 +108,23 @@ object HoodieCatalystExpressionUtils {
     })
   }
 
+  /**
+   * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]]
+   *
+   * NOTE: No safety checks are executed to validate that this projection is actually feasible,
+   *       it's up to the caller to make sure that such projection is possible.
+   *
+   * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
+   *       B is a subset of A
+   */
+  def generateMutableProjection(from: StructType, to: StructType): MutableProjection = {
+    val attrs = from.toAttributes
+    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
+    val targetExprs = to.fields.map(f => attrsMap(f.name))
+
+    GenerateMutableProjection.generate(targetExprs, attrs)
+  }
+
   /**
    * Parses and resolves expression against the attributes of the given table schema.
    *
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 9787387a1e..8ea0d1fdc5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -38,12 +38,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -468,8 +470,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
+    HoodieMerge merge = new HoodieAvroRecordMerge();
     int dedupParallelism = records.getNumPartitions() + 100;
-    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), merge);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
@@ -479,7 +482,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema());
+    dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), merge);
     dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index b4e75a02b2..ad73a256a6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.StringUtils;
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 9f1c25d647..1c638c5d63 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -22,9 +22,9 @@ import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -766,9 +766,7 @@ public class HoodieAvroUtils {
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
     switch (newSchema.getType()) {
       case RECORD:
-        if (!(oldRecord instanceof IndexedRecord)) {
-          throw new IllegalArgumentException("cannot rewrite record with different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
         IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
         List<Schema.Field> fields = newSchema.getFields();
         GenericData.Record newRecord = new GenericData.Record(newSchema);
@@ -800,9 +798,7 @@ public class HoodieAvroUtils {
         }
         return newRecord;
       case ARRAY:
-        if (!(oldRecord instanceof Collection)) {
-          throw new IllegalArgumentException("cannot rewrite record with different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
         fieldNames.push("element");
@@ -812,9 +808,7 @@ public class HoodieAvroUtils {
         fieldNames.pop();
         return newArray;
       case MAP:
-        if (!(oldRecord instanceof Map)) {
-          throw new IllegalArgumentException("cannot rewrite record with different type");
-        }
+        ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
         fieldNames.push("value");
@@ -830,7 +824,7 @@ public class HoodieAvroUtils {
     }
   }
 
-  private static String createFullName(Deque<String> fieldNames) {
+  public static String createFullName(Deque<String> fieldNames) {
     String result = "";
     if (!fieldNames.isEmpty()) {
       List<String> parentNames = new ArrayList<>();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index ac2df00151..daec2fee03 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -48,7 +48,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   }
 
   public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) {
-    super(key, data, operation);
+    super(key, data, operation, null);
   }
 
   public HoodieAvroIndexedRecord(HoodieRecord<IndexedRecord> record) {
@@ -67,11 +67,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
     return Option.of(data);
   }
 
-  @Override
-  public Comparable<?> getOrderingValue() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public HoodieRecord newInstance() {
     throw new UnsupportedOperationException();
@@ -99,16 +94,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
         .map(Object::toString).orElse(null);
   }
 
-  @Override
-  public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
-    return Option.empty();
-  }
-
   @Override
   public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
     ValidationUtils.checkState(other instanceof HoodieAvroIndexedRecord);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 9a9011da37..7bc6af89f8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -21,11 +21,10 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -39,7 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
   public HoodieAvroRecord(HoodieKey key, T data) {
@@ -47,7 +46,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   }
 
   public HoodieAvroRecord(HoodieKey key, T data, HoodieOperation operation) {
-    super(key, data, operation);
+    super(key, data, operation, null);
   }
 
   public HoodieAvroRecord(HoodieRecord<T> record) {
@@ -106,34 +105,6 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here
   //       for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
   //       is complete
-  //
-  // TODO cleanup
-
-  // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other
-  //       object, and may not create a new one
-  @Override
-  public HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord) {
-    T picked = unsafeCast(getData().preCombine(previousRecord.getData()));
-    if (picked instanceof HoodieMetadataPayload) {
-      // NOTE: HoodieMetadataPayload return a new payload
-      return new HoodieAvroRecord<>(getKey(), picked, getOperation());
-    }
-    return picked.equals(getData()) ? this : previousRecord;
-  }
-
-  // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could
-  //       be combined
-  @Override
-  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
-    Option<IndexedRecord> previousRecordAvroPayload = previousRecord.toIndexedRecord(schema, props);
-    if (!previousRecordAvroPayload.isPresent()) {
-      return Option.empty();
-    }
-
-    return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props)
-        .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
-  }
-
   @Override
   public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
     ValidationUtils.checkState(other instanceof HoodieAvroRecord);
@@ -141,7 +112,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
         (GenericRecord) toIndexedRecord(readerSchema, new Properties()).get(),
         (GenericRecord) other.toIndexedRecord(readerSchema, new Properties()).get(),
         writerSchema);
-    return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getPrecombineValue(getData())), getOperation());
+    return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getOrderingValue()), getOperation());
   }
 
   @Override
@@ -234,20 +205,10 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor
   @Nonnull
   private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, Comparable newPreCombineVal) {
     return unsafeCast(
-        ReflectionUtils.loadPayload(
+        HoodieRecordUtils.loadPayload(
             getData().getClass().getCanonicalName(),
             new Object[]{combinedAvroPayload, newPreCombineVal},
             GenericRecord.class,
             Comparable.class));
   }
-
-  private static <T extends HoodieRecordPayload> Comparable getPrecombineValue(T data) {
-    if (data instanceof BaseAvroPayload) {
-      return ((BaseAvroPayload) data).orderingVal;
-    }
-
-    return -1;
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
new file mode 100644
index 0000000000..bea89efdc1
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
+public class HoodieAvroRecordMerge implements HoodieMerge {
+  @Override
+  public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+    HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData()));
+    if (picked instanceof HoodieMetadataPayload) {
+      // NOTE: HoodieMetadataPayload return a new payload
+      return new HoodieAvroRecord(newer.getKey(), picked, newer.getOperation());
+    }
+    return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older;
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+    Option<IndexedRecord> previousRecordAvroPayload;
+    if (older instanceof HoodieAvroIndexedRecord) {
+      previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) older).getData());
+    } else {
+      previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props);
+    }
+    if (!previousRecordAvroPayload.isPresent()) {
+      return Option.empty();
+    }
+
+    return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props)
+        .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
similarity index 53%
copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
copy to hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
index dc5317f7d8..6becf35591 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java
@@ -16,18 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.execution.bulkinsert;
+package org.apache.hudi.common.model;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.Option;
 
-import org.apache.spark.api.java.JavaRDD;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
 
 /**
- * Abstract of bucket index bulk_insert partitioner
- * TODO implement partitioner for SIMPLE BUCKET INDEX
+ * HoodieMerge defines how to merge two records. It is a stateless component.
+ * It can implement the merging logic of HoodieRecord of different engines
+ * and avoid the performance consumption caused by the serialization/deserialization of Avro payload.
  */
-public abstract class RDDBucketIndexPartitioner<T extends HoodieRecordPayload>
-    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+public interface HoodieMerge extends Serializable {
+  
+  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+
+  Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 60ba6cfa72..50f530651f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -138,25 +138,35 @@ public abstract class HoodieRecord<T> implements Serializable {
    */
   private HoodieOperation operation;
 
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable<?> orderingVal;
+
   public HoodieRecord(HoodieKey key, T data) {
-    this(key, data, null);
+    this(key, data, null, null);
   }
 
-  public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
+  public HoodieRecord(HoodieKey key, T data, Comparable<?> orderingVal) {
+    this(key, data, null, orderingVal);
+  }
+
+  public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Comparable<?> orderingVal) {
     this.key = key;
     this.data = data;
     this.currentLocation = null;
     this.newLocation = null;
     this.sealed = false;
     this.operation = operation;
+    // default natural order is 0
+    this.orderingVal = orderingVal == null ? 0 : orderingVal;
   }
 
   public HoodieRecord(HoodieRecord<T> record) {
-    this(record.key, record.data);
+    this(record.key, record.data, record.operation, record.orderingVal);
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
     this.sealed = record.sealed;
-    this.operation = record.operation;
   }
 
   public HoodieRecord() {
@@ -176,15 +186,17 @@ public abstract class HoodieRecord<T> implements Serializable {
     return operation;
   }
 
+  public Comparable<?> getOrderingValue() {
+    return orderingVal;
+  }
+
   public T getData() {
     if (data == null) {
-      throw new IllegalStateException("Payload already deflated for record.");
+      throw new IllegalStateException("HoodieRecord already deflated for record.");
     }
     return data;
   }
 
-  public abstract Comparable<?> getOrderingValue();
-
   /**
    * Release the actual payload, to ease memory pressure. To be called after the record has been written to storage.
    * Once deflated, cannot be inflated.
@@ -288,16 +300,6 @@ public abstract class HoodieRecord<T> implements Serializable {
   //       for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
   //       is complete
   //
-  // TODO cleanup
-
-  // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other
-  //       object, and may not create a new one
-  public abstract HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord);
-
-  // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could
-  //       be combined
-  public abstract Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException;
-
   public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException;
 
   public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f295b0019c..6596d5b9a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -172,6 +173,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then "
           + " produce a new base file.");
 
+  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
+      .key("hoodie.compaction.merge.class")
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty
       .key("hoodie.archivelog.folder")
       .defaultValue("archived")
@@ -498,6 +505,14 @@ public class HoodieTableConfig extends HoodieConfig {
         "org.apache.hudi");
   }
 
+  /**
+   * Read the hoodie merge class for HoodieRecords from the table properties.
+   */
+  public String getMergeClass() {
+    return getStringOrDefault(MERGE_CLASS_NAME).replace("com.uber.hoodie",
+        "org.apache.hudi");
+  }
+
   public String getPreCombineField() {
     return getString(PRECOMBINE_FIELD);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 610a0f4185..42e2b041e3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -726,6 +726,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String recordKeyFields;
     private String archiveLogFolder;
     private String payloadClassName;
+    private String mergeClassName;
     private Integer timelineLayoutVersion;
     private String baseFileFormat;
     private String preCombineField;
@@ -794,6 +795,11 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
+    public PropertyBuilder setMergeClassName(String mergeClassName) {
+      this.mergeClassName = mergeClassName;
+      return this;
+    }
+
     public PropertyBuilder setPayloadClass(Class<? extends HoodieRecordPayload> payloadClass) {
       return setPayloadClassName(payloadClass.getName());
     }
@@ -1022,6 +1028,10 @@ public class HoodieTableMetaClient implements Serializable {
         tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName);
       }
 
+      if (mergeClassName != null) {
+        tableConfig.setValue(HoodieTableConfig.MERGE_CLASS_NAME, mergeClassName);
+      }
+
       if (null != tableCreateSchema) {
         tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, tableCreateSchema);
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 473e313ab9..56d528f2c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -41,13 +41,14 @@ import org.apache.hudi.exception.InvalidTableException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieAvroOrcReader;
 import org.apache.hudi.util.Lazy;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 4b1d4e8a95..2c3449fdae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -97,6 +97,8 @@ public abstract class AbstractHoodieLogRecordReader {
   private final String payloadClassFQN;
   // preCombine field
   private final String preCombineField;
+  // Stateless component for merging records
+  private final String mergeClassFQN;
   private final Properties payloadProps = new Properties();
   // simple key gen fields
   private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
@@ -165,6 +167,7 @@ public abstract class AbstractHoodieLogRecordReader {
     if (this.preCombineField != null) {
       this.payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.preCombineField);
     }
+    this.mergeClassFQN = tableConfig.getMergeClass();
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
     this.reverseReader = reverseReader;
@@ -530,6 +533,10 @@ public abstract class AbstractHoodieLogRecordReader {
     return payloadClassFQN;
   }
 
+  protected String getMergeClassFQN() {
+    return mergeClassFQN;
+  }
+
   public Option<String> getPartitionName() {
     return partitionName;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index dd079de364..7b4b53dca9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -24,11 +24,13 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -80,6 +82,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   // Stores the total time taken to perform reading and merging of log blocks
   private long totalTimeTakenToReadAndMergeBlocks;
 
+  private final HoodieMerge merge;
+
   @SuppressWarnings("unchecked")
   protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
@@ -97,6 +101,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      this.merge = HoodieRecordUtils.loadMerge(getMergeClassFQN());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
     }
@@ -152,7 +157,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
 
       HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
       HoodieRecordPayload oldValue = oldRecord.getData();
-      HoodieRecordPayload combinedValue = (HoodieRecordPayload)hoodieRecord.preCombine(oldRecord, readerSchema, this.getPayloadProps()).getData();
+      HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord, readerSchema, this.getPayloadProps()).getData();
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedValue != oldValue) {
         HoodieOperation operation = hoodieRecord.getOperation();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 297d7683f2..ca2c46022c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.inline.InLineFSUtils;
 import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index ebe53fe471..98f5dcf3a0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -31,8 +31,8 @@ import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetReaderIterator;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieAvroFileReader.HoodieRecordTransformIterator;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
new file mode 100644
index 0000000000..075d117fe2
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for HoodieRecord.
+ */
+public class HoodieRecordUtils {
+
+  private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
+
+  /**
+   * Instantiate a given class with a record merge.
+   */
+  public static HoodieMerge loadMerge(String mergeClass) {
+    try {
+      HoodieMerge merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+      if (null == merge) {
+        synchronized (HoodieMerge.class) {
+          merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
+          if (null == merge) {
+            merge = (HoodieMerge)ReflectionUtils.loadClass(mergeClass, new Object[]{});
+            INSTANCE_CACHE.put(mergeClass, merge);
+          }
+        }
+      }
+      return merge;
+    } catch (HoodieException e) {
+      throw new HoodieException("Unable to instantiate hoodie merge class ", e);
+    }
+  }
+
+  /**
+   * Instantiate a given class with an avro record payload.
+   */
+  public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs,
+                                                              Class<?>... constructorArgTypes) {
+    try {
+      return (T) ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
+    } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+      throw new HoodieException("Unable to instantiate payload class ", e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 6ee7928c75..b3e178320b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.log4j.LogManager;
@@ -69,18 +68,6 @@ public class ReflectionUtils {
     }
   }
 
-  /**
-   * Instantiate a given class with a generic record payload.
-   */
-  public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs,
-      Class<?>... constructorArgTypes) {
-    try {
-      return (T) getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs);
-    } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-      throw new HoodieException("Unable to instantiate payload class ", e);
-    }
-  }
-
   /**
    * Creates an instance of the given class. Use this version when dealing with interface types as constructor args.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
index d4bafd9c9f..d2d91bbfb6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
@@ -137,7 +137,7 @@ public class SpillableMapUtils {
     HoodieOperation operation = withOperationField
         ? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
     HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
-        ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class,
+        HoodieRecordUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class,
             Comparable.class), operation);
 
     return (R) hoodieRecord;
@@ -163,7 +163,7 @@ public class SpillableMapUtils {
    */
   public static <R> R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) {
     HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath),
-        ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class));
+        HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class));
     return (R) hoodieRecord;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2d94a13c0f..8b5bad3233 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -158,9 +158,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
     List<FileSlice> partitionFileSlices =
         HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
 
-    return engineContext.parallelize(partitionFileSlices)
+    return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
+        engineContext.parallelize(partitionFileSlices))
         .flatMap(
-            (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+            (SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
               // NOTE: Since this will be executed by executors, we can't access previously cached
               //       readers, and therefore have to always open new ones
               Pair<HoodieAvroFileReader, HoodieMetadataMergedLogRecordReader> readers =
@@ -171,31 +172,31 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
                 HoodieAvroFileReader baseFileReader = readers.getKey();
                 HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
 
-            if (baseFileReader == null && logRecordScanner == null) {
-              // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
-              return Collections.emptyIterator();
-            }
+                if (baseFileReader == null && logRecordScanner == null) {
+                  // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
+                  return Collections.emptyIterator();
+                }
 
-            boolean fullKeys = false;
+              boolean fullKeys = false;
 
-            Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-                readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
+              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+                  readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
 
-            List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
-                readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+                  readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-            LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
-                sortedKeyPrefixes.size(), timings));
+              LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
+                  sortedKeyPrefixes.size(), timings));
 
-            return mergedRecords.stream()
+              return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
                 .iterator();
-          } catch (IOException ioe) {
-            throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-          } finally {
-            closeReader(readers);
-          }
-        })
+            } catch (IOException ioe) {
+              throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+            } finally {
+              closeReader(readers);
+            }
+          })
         .filter(Objects::nonNull);
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
new file mode 100644
index 0000000000..0c51571c9e
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HoodieRecordUtilsTest {
+
+  @Test
+  void loadHoodieMerge() {
+    String mergeClassName = HoodieAvroRecordMerge.class.getName();
+    HoodieMerge merge1 = HoodieRecordUtils.loadMerge(mergeClassName);
+    HoodieMerge merge2 = HoodieRecordUtils.loadMerge(mergeClassName);
+    assertEquals(merge1.getClass().getName(), mergeClassName);
+    assertEquals(merge1, merge2);
+  }
+
+  @Test
+  void loadPayload() {
+    String payloadClassName = DefaultHoodieRecordPayload.class.getName();
+    HoodieRecordPayload payload = HoodieRecordUtils.loadPayload(payloadClassName, new Object[]{null, 0}, GenericRecord.class, Comparable.class);
+    assertEquals(payload.getClass().getName(), payloadClassName);
+  }
+}
\ No newline at end of file
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 27f19f4a66..3ac76cb7b3 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -322,6 +323,13 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(WriteOperationType.UPSERT.value())
       .withDescription("The write operation, that this write should do");
 
+  public static final ConfigOption<String> MERGE_CLASS_NAME = ConfigOptions
+      .key("write.merge.class")
+      .stringType()
+      .defaultValue(HoodieAvroRecordMerge.class.getName())
+      .withDescription("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+          + "types, such as Spark records or Flink records.");
+
   /**
    * Flag to indicate whether to drop duplicates before insert/upsert.
    * By default false to gain extra performance.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 64178a82fb..1f51df7bc4 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -102,6 +104,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
 
   private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
 
+  private transient HoodieMerge merge;
+
   /**
    * Total size tracer.
    */
@@ -121,6 +125,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     this.tracer = new TotalSizeTracer(this.config);
     initBuffer();
     initWriteFunction();
+    initMergeClass();
   }
 
   @Override
@@ -195,6 +200,12 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     }
   }
 
+  private void initMergeClass() {
+    String mergeClassName = metaClient.getTableConfig().getMergeClass();
+    LOG.info("init hoodie merge with class [{}]", mergeClassName);
+    merge = HoodieRecordUtils.loadMerge(mergeClassName);
+  }
+
   /**
    * Represents a data item in the buffer, this is needed to reduce the
    * memory footprint.
@@ -421,7 +432,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
-      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema());
+      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), merge);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
@@ -457,7 +468,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
             if (records.size() > 0) {
               if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
                 records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1,
-                    this.writeClient.getConfig().getSchema());
+                    this.writeClient.getConfig().getSchema() merge);
               }
               bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 345c9cde6c..3143f55c63 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.streamer;
 
 import org.apache.hudi.client.utils.OperationConverter;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -118,6 +119,10 @@ public class FlinkStreamerConfig extends Configuration {
       + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.")
   public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
 
+  @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records."
+      + "Implement your own, if you want to implement specific record merge logic.")
+  public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
   @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
       + "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
   public WriteOperationType operation = WriteOperationType.UPSERT;
@@ -363,6 +368,7 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setString(FlinkOptions.OPERATION, config.operation.value());
     conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
     conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
+    conf.setString(FlinkOptions.MERGE_CLASS_NAME, config.mergeClassName);
     conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
     conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
     conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4533ef7179..72bb548a2d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -453,7 +453,8 @@ public class HoodieTableSource implements
         tableAvroSchema.toString(),
         AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
         inputSplits,
-        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+        conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","),
+        conf.getString(FlinkOptions.MERGE_CLASS_NAME));
     return MergeOnReadInputFormat.builder()
         .config(this.conf)
         .tableState(hoodieTableState)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index e2e8079a47..f5cadcef05 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,12 +18,15 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieMerge;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -208,7 +211,8 @@ public class MergeOnReadInputFormat
           this.requiredPos,
           this.emitDelete,
           this.tableState.getOperationPos(),
-          getFullSchemaReader(split.getBasePath().get()));
+          getFullSchemaReader(split.getBasePath().get()),
+          tableState.getMergeClass());
     } else {
       throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
           + "file path: " + split.getBasePath()
@@ -700,6 +704,8 @@ public class MergeOnReadInputFormat
 
     private final InstantRange instantRange;
 
+    private final HoodieMerge merge;
+
     // add the flag because the flink ParquetColumnarRowSplitReader is buggy:
     // method #reachedEnd() returns false after it returns true.
     // refactor it out once FLINK-22370 is resolved.
@@ -722,11 +728,12 @@ public class MergeOnReadInputFormat
         int[] requiredPos,
         boolean emitDelete,
         int operationPos,
-        ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
+        ParquetColumnarRowSplitReader reader,
+        String mergeClass) { // the reader should be with full schema
       this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema,
           Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
           Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))),
-          emitDelete, operationPos, reader);
+          emitDelete, operationPos, reader, mergeClass);
     }
 
     public MergeIterator(
@@ -740,7 +747,8 @@ public class MergeOnReadInputFormat
         Option<Function<IndexedRecord, GenericRecord>> avroProjection,
         boolean emitDelete,
         int operationPos,
-        ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
+        ParquetColumnarRowSplitReader reader,
+        String mergeClass) { // the reader should be with full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
       this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, hadoopConf);
@@ -753,6 +761,7 @@ public class MergeOnReadInputFormat
       this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
       this.projection = projection;
       this.instantRange = split.getInstantRange().orElse(null);
+      this.merge = HoodieRecordUtils.loadMerge(mergeClass);
     }
 
     @Override
@@ -841,7 +850,8 @@ public class MergeOnReadInputFormat
         String curKey) throws IOException {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
-      return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps);
+      Option<HoodieRecord> resultRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, payloadProps);
+      return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord();
     }
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecbb79..bbb21db7f8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -41,6 +41,7 @@ public class MergeOnReadTableState implements Serializable {
   private final List<MergeOnReadInputSplit> inputSplits;
   private final String[] pkFields;
   private final int operationPos;
+  private final String mergeClass;
 
   public MergeOnReadTableState(
       RowType rowType,
@@ -48,7 +49,8 @@ public class MergeOnReadTableState implements Serializable {
       String avroSchema,
       String requiredAvroSchema,
       List<MergeOnReadInputSplit> inputSplits,
-      String[] pkFields) {
+      String[] pkFields,
+      String mergeClass) {
     this.rowType = rowType;
     this.requiredRowType = requiredRowType;
     this.avroSchema = avroSchema;
@@ -56,6 +58,7 @@ public class MergeOnReadTableState implements Serializable {
     this.inputSplits = inputSplits;
     this.pkFields = pkFields;
     this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
+    this.mergeClass = mergeClass;
   }
 
   public RowType getRowType() {
@@ -82,6 +85,10 @@ public class MergeOnReadTableState implements Serializable {
     return operationPos;
   }
 
+  public String getMergeClass() {
+    return mergeClass;
+  }
+
   public int[] getRequiredPositions() {
     final List<String> fieldNames = rowType.getFieldNames();
     return requiredRowType.getFieldNames().stream()
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index cdacefbf17..2305adeed1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -196,6 +196,7 @@ public class StreamerUtil {
                 .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
                 .build())
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                .withMergeClass(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
                 .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
                 .withInlineCompactionTriggerStrategy(
                     CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
@@ -310,6 +311,7 @@ public class StreamerUtil {
           .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
           .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))
           .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
+          .setMergeClassName(conf.getString(FlinkOptions.MERGE_CLASS_NAME))
           .setPreCombineField(OptionsResolver.getPreCombineField(conf))
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null))
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 63d5c1f6bd..f2095c1844 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -263,7 +263,8 @@ public class TestStreamReadOperator {
         tableAvroSchema.toString(),
         AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList(),
-        new String[0]);
+        new String[0],
+        metaClient.getTableConfig().getMergeClass());
     MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
         .config(conf)
         .tableState(hoodieTableState)
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 98053f7821..162e85f3a6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -763,9 +763,9 @@ public class TestData {
         if (scanner != null) {
           for (String curKey : scanner.getRecords().keySet()) {
             if (!keyToSkip.contains(curKey)) {
-              Option<GenericRecord> record = (Option<GenericRecord>) scanner.getRecords()
-                  .get(curKey).getData()
-                  .getInsertValue(schema, config.getProps());
+              Option<GenericRecord> record = (Option<GenericRecord>) ((HoodieAvroRecord) scanner.getRecords()
+                      .get(curKey)).getData()
+                      .getInsertValue(schema, config.getProps());
               if (record.isPresent()) {
                 readBuffer.add(filterOutVariables(record.get()));
               }
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index db11bd6717..733bdda275 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 57cab09377..51b4b7ea46 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -308,6 +308,12 @@ object DataSourceWriteOptions {
    */
   val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME
 
+  /**
+   * HoodieMerge will replace the payload to process the merge of data
+   * and provide the same capabilities as the payload
+   */
+  val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME
+
   /**
    * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
    * will be obtained by invoking .toString() on the field value. Nested fields can be specified using
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index bff829cff6..569ffa690b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -42,7 +42,6 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.hudi.io.storage.HoodieHFileReader
 import org.apache.hudi.io.storage.HoodieAvroHFileReader
 
 import org.apache.spark.SerializableWritable
@@ -79,7 +78,8 @@ case class HoodieTableState(tablePath: String,
                             preCombineFieldOpt: Option[String],
                             usesVirtualKeys: Boolean,
                             recordPayloadClassName: String,
-                            metadataConfig: HoodieMetadataConfig)
+                            metadataConfig: HoodieMetadataConfig,
+                            mergeClass: String)
 
 /**
  * Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -468,7 +468,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       preCombineFieldOpt = preCombineFieldOpt,
       usesVirtualKeys = !tableConfig.populateMetaFields(),
       recordPayloadClassName = tableConfig.getPayloadClass,
-      metadataConfig = fileIndex.metadataConfig
+      metadataConfig = fileIndex.metadataConfig,
+      mergeClass = tableConfig.getMergeClass
     )
   }
 
@@ -747,7 +748,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
 
       reader.getRecordIterator(requiredAvroSchema).asScala
         .map(record => {
-          avroToRowConverter.apply(record).get
+          avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get
         })
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0c0d1e2feb..32bab703d7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -150,6 +150,7 @@ object HoodieSparkSqlWriter {
           .setBaseFileFormat(baseFileFormat)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+          .setMergeClassName(hoodieConfig.getString(MERGE_CLASS_NAME))
           // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
           // but we are interested in what user has set, hence fetching from optParams.
           .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
@@ -489,6 +490,7 @@ object HoodieSparkSqlWriter {
           .setRecordKeyFields(recordKeyFields)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
+          .setMergeClassName(hoodieConfig.getStringOrDefault(MERGE_CLASS_NAME))
           .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
           .setBootstrapIndexClass(bootstrapIndexClass)
           .setBaseFileFormat(baseFileFormat)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..54e4010fe1
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import org.apache.avro.Schema
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate}
+import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.hudi.common.util.ValidationUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.HoodieCatalystExpressionUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+import scala.collection.mutable
+
+/**
+ * Helper class to do common stuff across Spark InternalRow.
+ * Provides common methods similar to {@link HoodieAvroUtils}.
+ */
+object HoodieInternalRowUtils {
+
+  val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection]
+  val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]]
+
+  def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
+    val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
+    val row = new JoinedRow(left, right)
+    val projection = getCachedProjection(mergeSchema, stitchedSchema)
+    projection(row)
+  }
+
+  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
+    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
+
+    val oldFieldMap = getCachedSchemaPosMap(oldSchema)
+    for ((field, pos) <- newSchema.fields.zipWithIndex) {
+      var oldValue: AnyRef = null
+      if (oldFieldMap.contains(field.name)) {
+        val (oldField, oldPos) = oldFieldMap(field.name)
+        oldValue = oldRecord.get(oldPos, oldField.dataType)
+      }
+      if (oldValue != null) {
+        field.dataType match {
+          case structType: StructType =>
+            val oldField = oldFieldMap(field.name)._1.asInstanceOf[StructType]
+            rewriteRecord(oldValue.asInstanceOf[InternalRow], oldField, structType)
+          case decimalType: DecimalType =>
+            val oldField = oldFieldMap(field.name)._1.asInstanceOf[DecimalType]
+            if (decimalType.scale != oldField.scale || decimalType.precision != oldField.precision) {
+              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+              )
+            } else {
+              newRow.update(pos, oldValue)
+            }
+          case _ =>
+            newRow.update(pos, oldValue)
+        }
+      } else {
+        // TODO default value in newSchema
+      }
+    }
+
+    newRow
+  }
+
+  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: util.Map[String, String]): InternalRow = {
+    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new util.LinkedList[String]).asInstanceOf[InternalRow]
+  }
+
+  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: util.Map[String, String], fieldNames: util.Deque[String]): Any = {
+    if (oldRecord == null) {
+      null
+    } else {
+      newSchema match {
+        case targetSchema: StructType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], "cannot rewrite record with different type")
+          val oldRow = oldRecord.asInstanceOf[InternalRow]
+          val helper = mutable.Map[Integer, Any]()
+
+          val oldSchemaPos = getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType])
+          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
+            fieldNames.push(field.name)
+            if (oldSchemaPos.contains(field.name)) {
+              val (oldField, oldPos) = oldSchemaPos(field.name)
+              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+            } else {
+              val fieldFullName = createFullName(fieldNames)
+              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
+              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
+              // deal with rename
+              if (!oldSchemaPos.contains(field.name) && oldSchemaPos.contains(lastColNameFromOldSchema)) {
+                // find rename
+                val (oldField, oldPos) = oldSchemaPos(lastColNameFromOldSchema)
+                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
+              }
+            }
+            fieldNames.pop()
+          }
+          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
+          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
+            if (helper.contains(i)) {
+              newRow.update(i, helper(i))
+            } else {
+              // TODO add default val
+              newRow.update(i, null)
+            }
+          }
+
+          newRow
+        case targetSchema: ArrayType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], "cannot rewrite record with different type")
+          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
+          val oldArray = oldRecord.asInstanceOf[ArrayData]
+          val newElementType = targetSchema.elementType
+          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
+          fieldNames.push("element")
+          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
+          fieldNames.pop()
+
+          newArray
+        case targetSchema: MapType =>
+          ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], "cannot rewrite record with different type")
+          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
+          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
+          val oldMap = oldRecord.asInstanceOf[MapData]
+          val newValueType = targetSchema.valueType
+          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
+          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
+          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
+          fieldNames.push("value")
+          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, value) }
+          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
+          fieldNames.pop()
+
+          newMap
+        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
+      }
+    }
+  }
+
+  def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
+    val newRecord = rewriteRecord(record, oldSchema, newSchema)
+    newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
+
+    newRecord
+  }
+
+  def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
+    val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, new util.HashMap[String, String]())
+    newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
+
+    newRecord
+  }
+
+  def getCachedSchema(schema: Schema): StructType = {
+    if (!schemaMap.contains(schema)) {
+      schemaMap.synchronized {
+        if (!schemaMap.contains(schema)) {
+          val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          schemaMap.put(schema, structType)
+        }
+      }
+    }
+    schemaMap.get(schema)
+  }
+
+  private def getCachedProjection(from: StructType, to: StructType): Projection = {
+    val schemaPair = (from, to)
+    if (!projectionMap.contains(schemaPair)) {
+      projectionMap.synchronized {
+        if (!projectionMap.contains(schemaPair)) {
+          val projection = HoodieCatalystExpressionUtils.generateMutableProjection(from, to)
+          projectionMap.put(schemaPair, projection)
+        }
+      }
+    }
+    projectionMap.get(schemaPair)
+  }
+
+  def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = {
+    if (!SchemaPosMap.contains(schema)) {
+      SchemaPosMap.synchronized {
+        if (!SchemaPosMap.contains(schema)) {
+          val fieldMap = schema.fields.zipWithIndex.map { case (field, i) => (field.name, (field, i)) }.toMap
+          SchemaPosMap.put(schema, fieldMap)
+        }
+      }
+    }
+    SchemaPosMap.get(schema)
+  }
+
+  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
+      oldSchema match {
+        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType =>
+          oldValue
+        case DecimalType() =>
+          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
+        case _ =>
+          throw new HoodieException("Unknown schema type: " + newSchema)
+      }
+    } else {
+      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
+    }
+  }
+
+  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
+    val value = newSchema match {
+      case NullType | BooleanType =>
+      case DateType if oldSchema.equals(StringType) =>
+        fromJavaDate(java.sql.Date.valueOf(oldValue.toString))
+      case LongType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].longValue()
+          case _ =>
+        }
+      case FloatType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].floatValue()
+          case LongType => oldValue.asInstanceOf[Long].floatValue()
+          case _ =>
+        }
+      case DoubleType =>
+        oldSchema match {
+          case IntegerType => oldValue.asInstanceOf[Int].doubleValue()
+          case LongType => oldValue.asInstanceOf[Long].doubleValue()
+          case FloatType => java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "")
+          case _ =>
+        }
+      case BinaryType =>
+        oldSchema match {
+          case StringType => oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)
+          case _ =>
+        }
+      case StringType =>
+        oldSchema match {
+          case BinaryType => new String(oldValue.asInstanceOf[Array[Byte]])
+          case DateType => toJavaDate(oldValue.asInstanceOf[Integer]).toString
+          case IntegerType | LongType | FloatType | DoubleType | DecimalType() => oldValue.toString
+          case _ =>
+        }
+      case DecimalType() =>
+        oldSchema match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            val scale = newSchema.asInstanceOf[DecimalType].scale
+
+            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+          case _ =>
+        }
+      case _ =>
+    }
+    if (value == None) {
+      throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
+    } else {
+      value
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
new file mode 100644
index 0000000000..a22e78af21
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, Comparable orderingVal) {
+    super(key, data, orderingVal);
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+  }
+
+  public HoodieSparkRecord(HoodieRecord<InternalRow> record) {
+    super(record);
+  }
+
+  public HoodieSparkRecord() {
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, getOrderingValue());
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return getRecordKey();
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    return getRecordKey();
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, readerStructType, (InternalRow) other.getData(), readerStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, readerStructType, writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols);
+    // TODO change mapper type
+    return mapper.apply((IndexedRecord) rewriteRow);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, newValue);
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        data.update(recordSchema.getField(metadataField.getFieldName()).pos(), value);
+      }
+    });
+    return this;
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    return Option.empty();
+  }
+
+  @Override
+  public boolean isPresent(Schema schema, Properties prop) throws IOException {
+    if (null == data) {
+      return false;
+    }
+    Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+    return !(deleteMarker instanceof Boolean && (boolean) deleteMarker);
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+    // TODO SENTINEL should refactor without Avro(GenericRecord)
+    if (null != data && data.equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
similarity index 51%
copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
copy to hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
index dc5317f7d8..88ae7c13df 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java
@@ -16,18 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.execution.bulkinsert;
+package org.apache.spark.sql.hudi;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.common.model.HoodieMerge;
+import org.apache.hudi.common.util.Option;
 
-import org.apache.spark.api.java.JavaRDD;
+import java.io.IOException;
+import java.util.Properties;
 
-/**
- * Abstract of bucket index bulk_insert partitioner
- * TODO implement partitioner for SIMPLE BUCKET INDEX
- */
-public abstract class RDDBucketIndexPartitioner<T extends HoodieRecordPayload>
-    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
-}
+public class HoodieSparkRecordMerge implements HoodieMerge {
+
+  @Override
+  public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+    if (older.getData() == null) {
+      // use natural order for delete record
+      return older;
+    }
+    if (older.getOrderingValue().compareTo(newer.getOrderingValue()) > 0) {
+      return older;
+    } else {
+      return newer;
+    }
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+    return Option.of(newer);
+  }
+}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 1ed0e5e1a4..2421face72 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -48,7 +48,7 @@ object SparkHelpers {
     // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
     parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
 
-    val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
+    val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
     for (rec <- sourceRecords) {
       val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
       if (!keysToSkip.contains(key)) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
new file mode 100644
index 0000000000..7a08ee64bf
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {
+
+  private var sparkSession: SparkSession = _
+
+  private val schema1 = StructType(
+    Array(
+      StructField("name", StringType),
+      StructField("age", IntegerType)
+    )
+  )
+  private val schema2 = StructType(
+    Array(
+      StructField("name1", StringType),
+      StructField("age1", IntegerType)
+    )
+  )
+  private val schemaMerge = StructType(schema1.fields ++ schema2.fields)
+  private val schema1WithMetaData = StructType(Array(
+    StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType),
+    StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType)
+  ) ++ schema1.fields)
+
+  override protected def beforeAll(): Unit = {
+    // Initialize a local spark env
+    val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+    jsc.setLogLevel("ERROR")
+    sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+  }
+
+  override protected def afterAll(): Unit = {
+    sparkSession.close()
+  }
+
+  test("test merge") {
+    val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181)))
+    val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first()
+    val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first()
+    val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge)
+    assert(rowMerge.get(0, StringType).toString.equals("like"))
+    assert(rowMerge.get(1, IntegerType) == 18)
+    assert(rowMerge.get(2, StringType).toString.equals("like1"))
+    assert(rowMerge.get(3, IntegerType) == 181)
+  }
+
+  test("test rewrite") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
+    val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
+    val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1)
+    val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2)
+    assert(newRow1.get(0, StringType).toString.equals("like"))
+    assert(newRow1.get(1, IntegerType) == 18)
+    assert(newRow2.get(0, StringType).toString.equals("like1"))
+    assert(newRow2.get(1, IntegerType) == 181)
+  }
+
+  test("test rewrite with nullable value") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
+    val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge)
+    assert(newRow.get(0, StringType).toString.equals("like"))
+    assert(newRow.get(1, IntegerType) == 18)
+    assert(newRow.get(2, StringType) == null)
+    assert(newRow.get(3, IntegerType) == null)
+  }
+
+  test("test rewrite with metaDataFiled value") {
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
+    val newRow = HoodieInternalRowUtils.rewriteRecordWithMetadata(oldRow, schema1, schema1WithMetaData, "file1")
+    assert(newRow.get(0, StringType) == null)
+    assert(newRow.get(1, StringType) == null)
+    assert(newRow.get(2, StringType) == null)
+    assert(newRow.get(3, StringType) == null)
+    assert(newRow.get(4, StringType).toString.equals("file1"))
+    assert(newRow.get(5, StringType) == null)
+    assert(newRow.get(6, BooleanType) == null)
+    assert(newRow.get(7, StringType).toString.equals("like"))
+    assert(newRow.get(8, IntegerType) == 18)
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
new file mode 100644
index 0000000000..cb5529721c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.nio.ByteBuffer
+import java.util.{ArrayList, HashMap, Objects}
+import org.apache.avro.generic.GenericData
+import org.apache.avro.{LogicalTypes, Schema}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.internal.schema.Types
+import org.apache.hudi.internal.schema.action.TableChanges
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.hudi.HoodieInternalRowUtils
+import org.apache.spark.sql.types._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll {
+  private var sparkSession: SparkSession = _
+
+  override protected def beforeAll(): Unit = {
+    // Initialize a local spark env
+    val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName))
+    jsc.setLogLevel("ERROR")
+    sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate
+  }
+
+  override protected def afterAll(): Unit = {
+    sparkSession.close()
+  }
+
+  /**
+   * test record data type changes.
+   * int => long/float/double/string
+   * long => float/double/string
+   * float => double/String
+   * double => String/Decimal
+   * Decimal => Decimal/String
+   * String => date/decimal
+   * date => String
+   */
+  test("test rewrite record with type changed") {
+    val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"com1\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col0\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col11\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col12\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col21\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + "{\"name\":\"col31\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+      + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col41\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col41\","
+      + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + "{\"name\":\"col51\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+      + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+      + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+      + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}")
+    // create a test record with avroSchema
+    val avroRecord = new GenericData.Record(avroSchema)
+    avroRecord.put("id", 1)
+    avroRecord.put("comb", 100)
+    avroRecord.put("com1", -100)
+    avroRecord.put("col0", 256)
+    avroRecord.put("col1", 1000L)
+    avroRecord.put("col11", -100L)
+    avroRecord.put("col12", 2000L)
+    avroRecord.put("col2", -5.001f)
+    avroRecord.put("col21", 5.001f)
+    avroRecord.put("col3", 12.999d)
+    avroRecord.put("col31", 9999.999d)
+    val currentDecimalType = avroSchema.getField("col4").schema.getTypes.get(1)
+    val bd = new java.math.BigDecimal("123.456").setScale(currentDecimalType.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType))
+    val currentDecimalType1 = avroSchema.getField("col41").schema.getTypes.get(1)
+    val bd1 = new java.math.BigDecimal("7890.456").setScale(currentDecimalType1.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale)
+    avroRecord.put("col41", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd1, currentDecimalType1, currentDecimalType1.getLogicalType))
+    avroRecord.put("col5", "2011-01-01")
+    avroRecord.put("col51", "199.342")
+    avroRecord.put("col6", 18987)
+    avroRecord.put("col7", 1640491505000000L)
+    avroRecord.put("col8", false)
+    val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53))
+    avroRecord.put("col9", bb)
+    assert(GenericData.get.validate(avroSchema, avroRecord))
+    val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
+    // do change type operation
+    val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
+    updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateCo [...]
+    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange)
+    val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName)
+    val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String])
+    assert(GenericData.get.validate(newAvroSchema, newRecord))
+    // Convert avro to internalRow
+    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema)
+    val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+    val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get
+    val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema)
+      .apply(newRecord).get
+    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+  }
+
+  test("test rewrite nest record") {
+    val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()),
+      Types.Field.get(1, true, "data", Types.StringType.get()),
+      Types.Field.get(2, true, "preferences",
+        Types.RecordType.get(Types.Field.get(5, false, "feature1",
+          Types.BooleanType.get()), Types.Field.get(6, true, "feature2", Types.BooleanType.get()))),
+      Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get())),
+      Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, Types.StringType.get(),
+        Types.RecordType.get(Types.Field.get(10, false, "lat", Types.FloatType.get()), Types.Field.get(11, false, "long", Types.FloatType.get())), false))
+    )
+    val schema = AvroInternalSchemaConverter.convert(record, "test1")
+    val avroRecord = new GenericData.Record(schema)
+    GenericData.get.validate(schema, avroRecord)
+    avroRecord.put("id", 2)
+    avroRecord.put("data", "xs")
+    // fill record type
+    val preferencesRecord = new GenericData.Record(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"))
+    preferencesRecord.put("feature1", false)
+    preferencesRecord.put("feature2", true)
+    assert(GenericData.get.validate(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"), preferencesRecord))
+    avroRecord.put("preferences", preferencesRecord)
+    // fill mapType
+    val locations = new HashMap[String, GenericData.Record]
+    val mapSchema = AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations")
+    val locationsValue: GenericData.Record = new GenericData.Record(mapSchema)
+    locationsValue.put("lat", 1.2f)
+    locationsValue.put("long", 1.4f)
+    val locationsValue1: GenericData.Record = new GenericData.Record(mapSchema)
+    locationsValue1.put("lat", 2.2f)
+    locationsValue1.put("long", 2.4f)
+    locations.put("key1", locationsValue)
+    locations.put("key2", locationsValue1)
+    avroRecord.put("locations", locations)
+    val doubles = new ArrayList[Double]
+    doubles.add(2.0d)
+    doubles.add(3.0d)
+    avroRecord.put("doubles", doubles)
+    // do check
+    assert(GenericData.get.validate(schema, avroRecord))
+    // create newSchema
+    val newRecord = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get), Types.Field.get(1, true, "data", Types.StringType.get), Types.Field.get(2, true, "preferences", Types.RecordType.get(Types.Field.get(5, false, "feature1", Types.BooleanType.get), Types.Field.get(5, true, "featurex", Types.BooleanType.get), Types.Field.get(6, true, "feature2", Types.BooleanType.get))), Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get)), Types [...]
+    val newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName)
+    val newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String])
+    // test the correctly of rewrite
+    assert(GenericData.get.validate(newAvroSchema, newAvroRecord))
+    // Convert avro to internalRow
+    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema)
+    val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+    val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get
+    val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get
+    val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String])
+    internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema)
+  }
+
+  private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = {
+    schema match {
+      case StructType(fields) =>
+        val expectedRow = expected.asInstanceOf[InternalRow]
+        val actualRow = actual.asInstanceOf[InternalRow]
+        fields.zipWithIndex.foreach { case (field, i) => internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, field.dataType), field.dataType) }
+      case ArrayType(elementType, _) =>
+        val expectedArray = expected.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        val actualArray = actual.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        if (expectedArray.size != actualArray.size) {
+          throw new AssertionError()
+        } else {
+          expectedArray.zip(actualArray).foreach { case (e1, e2) => internalRowCompare(e1, e2, elementType) }
+        }
+      case MapType(keyType, valueType, _) =>
+        val expectedKeyArray = expected.asInstanceOf[MapData].keyArray()
+        val expectedValueArray = expected.asInstanceOf[MapData].valueArray()
+        val actualKeyArray = actual.asInstanceOf[MapData].keyArray()
+        val actualValueArray = actual.asInstanceOf[MapData].valueArray()
+        internalRowCompare(expectedKeyArray, actualKeyArray, ArrayType(keyType))
+        internalRowCompare(expectedValueArray, actualValueArray, ArrayType(valueType))
+      case StringType => if (checkNull(expected, actual) || !expected.toString.equals(actual.toString)) {
+        throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+      }
+      // TODO Verify after 'https://github.com/apache/hudi/pull/5907' merge
+      case BinaryType => if (checkNull(expected, actual) || !expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]])) {
+      // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+      }
+      case _ => if (!Objects.equals(expected, actual)) {
+      // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString))
+      }
+    }
+  }
+
+  private def checkNull(left: Any, right: Any): Boolean = {
+    (left == null && right != null) || (left == null && right != null)
+  }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 3fbb8c0f8d..1f5756a7ae 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -277,6 +277,7 @@ public class DeltaSync implements Serializable, Closeable {
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           .setPayloadClassName(cfg.payloadClassName)
+          .setMergeClassName(cfg.mergeClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
           .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
@@ -375,6 +376,7 @@ public class DeltaSync implements Serializable, Closeable {
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
           .setPayloadClassName(cfg.payloadClassName)
+          .setMergeClassName(cfg.mergeClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
           .setPartitionFields(partitionColumns)
           .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key()))
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 74cb3e31df..d0c291ba8e 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecordMerge;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -274,6 +275,10 @@ public class HoodieDeltaStreamer implements Serializable {
         + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
     public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
 
+    @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records."
+        + "Implement your own, if you want to implement specific record merge logic.")
+    public String mergeClassName = HoodieAvroRecordMerge.class.getName();
+
     @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema"
         + ".SchemaProvider to attach schemas to input & target table data, built in options: "
         + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."