You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/10/06 13:08:03 UTC

[hudi] 07/07: [RFC-46][HUDI-4414] Update the RFC-46 doc to fix comments feedback (#6132)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 517ca54fe2fda67158ecbbc01d1e096441eccf1a
Author: komao <ma...@gmail.com>
AuthorDate: Thu Sep 22 11:17:54 2022 +0800

    [RFC-46][HUDI-4414] Update the RFC-46 doc to fix comments feedback (#6132)
    
    * Update the RFC-46 doc to fix comments feedback
    
    * fix
    
    Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |   5 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   5 +-
 .../apache/hudi/io/HoodieMergeHandleFactory.java   |   5 +-
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  23 ++-
 .../io/HoodieSortedMergeHandleWithChangeLog.java   |  15 +-
 .../hudi/table/action/commit/BaseWriteHelper.java  |   4 +-
 .../table/action/commit/HoodieWriteHelper.java     |   1 -
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   1 -
 .../FlinkSizeBasedClusteringPlanStrategy.java      |   3 +-
 .../FlinkMergeAndReplaceHandleWithChangeLog.java   |  19 +--
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |  19 +--
 .../apache/hudi/io/FlinkWriteHandleFactory.java    |  23 ++-
 .../hudi/table/action/commit/FlinkWriteHelper.java |   1 -
 .../hudi/table/action/commit/JavaWriteHelper.java  |   1 -
 .../MultipleSparkJobExecutionStrategy.java         |   2 +-
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |   2 -
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   8 +-
 .../table/log/HoodieCDCLogRecordIterator.java      |   6 +-
 .../table/log/block/HoodieAvroDataBlock.java       |   3 +
 .../common/table/log/block/HoodieCDCDataBlock.java |   7 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   3 +-
 .../scala/org/apache/hudi/LogFileIterator.scala    | 103 ++++++++-----
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   |  22 +--
 .../apache/hudi/functional/TestCOWDataSource.scala |   5 +-
 .../hudi/functional/cdc/HoodieCDCTestBase.scala    |  12 +-
 rfc/rfc-46/rfc-46.md                               | 169 ++++++++++++++++-----
 30 files changed, 310 insertions(+), 167 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 851d201f63..1984f325f0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -119,7 +119,7 @@ import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
  * Abstract Write Client providing functionality for performing commit, index updates and rollback
  * Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
  *
- * @param <T> Sub type of HoodieRecordPayload
+ * @param <T> Type of data
  * @param <I> Type of inputs
  * @param <K> Type of keys
  * @param <O> Type of outputs
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index 303eea76db..50b87e3989 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -145,10 +146,10 @@ public class HoodieCDCLogger implements Closeable {
     }
 
     try {
-      List<IndexedRecord> records = cdcData.values().stream()
+      List<HoodieRecord> records = cdcData.values().stream()
           .map(record -> {
             try {
-              return record.getInsertValue(cdcSchema).get();
+              return new HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get());
             } catch (IOException e) {
               throw new HoodieIOException("Failed to get cdc record", e);
             }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index fef8fa8853..0933d9f28b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -267,7 +267,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
     boolean isDelete = false;
     Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     if (combineRecordOp.isPresent()) {
@@ -292,7 +292,8 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     writeInsertRecord(hoodieRecord, Option.of(hoodieRecord), schema, config.getProps());
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties prop) {
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties prop)
+      throws IOException {
     if (writeRecord(hoodieRecord, insertRecord, schema, prop, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
       insertRecordsWritten++;
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index 436eff5dac..b110c2c081 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -21,7 +21,6 @@ package org.apache.hudi.io;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -38,7 +37,7 @@ public class HoodieMergeHandleFactory {
   /**
    * Creates a merge handle for normal write path.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+  public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
       WriteOperationType operationType,
       HoodieWriteConfig writeConfig,
       String instantTime,
@@ -70,7 +69,7 @@ public class HoodieMergeHandleFactory {
   /**
    * Creates a merge handle for compaction path.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+  public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
       HoodieWriteConfig writeConfig,
       String instantTime,
       HoodieTable<T, I, K, O> table,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 910bc42158..6e8fda0b10 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -20,10 +20,10 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -31,9 +31,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +43,7 @@ import java.util.Map;
 /**
  * A merge handle that supports logging change logs.
  */
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
   protected final HoodieCDCLogger cdcLogger;
 
   public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -73,19 +75,24 @@ public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K,
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
+    // Get the data before deflated
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema, this.config.getProps())
+        .map(HoodieRecord::getData);
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, recordOption);
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 8d317b709a..727765b3e2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -19,23 +19,25 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.Schema;
 
+import java.io.IOException;
+import java.util.Properties;
 import java.util.Iterator;
 import java.util.Map;
 
 /**
  * A sorted merge handle that supports logging change logs.
  */
-public class HoodieSortedMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandleWithChangeLog<T, I, K, O> {
+public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandleWithChangeLog<T, I, K, O> {
   public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                               Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
                                               TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
@@ -51,9 +53,10 @@ public class HoodieSortedMergeHandleWithChangeLog<T extends HoodieRecordPayload,
     super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
-    final boolean result = super.writeRecord(hoodieRecord, insertRecord);
-    this.cdcLogger.put(hoodieRecord, null, insertRecord);
+  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> insertRecord, Schema schema, Properties props)
+      throws IOException {
+    final boolean result = super.writeRecord(hoodieRecord, insertRecord, schema, props);
+    this.cdcLogger.put(hoodieRecord, null, insertRecord.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
     return result;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 4ac5b3d714..0b96529379 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -84,9 +84,9 @@ public abstract class BaseWriteHelper<T, I, K, O, R> {
   public I deduplicateRecords(
       I records, HoodieTable<T, I, K, O> table, int parallelism) {
     HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
-    return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), merge);
+    return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
   }
 
   public abstract I deduplicateRecords(
-      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieMerge merge);
+      I records, HoodieIndex<?, ?> index, int parallelism, String schema, Properties props, HoodieRecordMerger merge);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 0ed220dea9..bd495f69da 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 9085f392a9..1da892d7c4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index 891ddf8993..3abffe38d8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -48,7 +47,7 @@ import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
  * 1) Creates clustering groups based on max size allowed per group.
  * 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
  */
-public class FlinkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
+public class FlinkSizeBasedClusteringPlanStrategy<T>
     extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
   private static final Logger LOG = LogManager.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 62b5481cf9..fa14ba0418 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -20,9 +20,9 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -30,9 +30,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
@@ -42,7 +42,7 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
-public class FlinkMergeAndReplaceHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O>
+public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeAndReplaceHandle<T, I, K, O> {
   private final HoodieCDCLogger cdcLogger;
 
@@ -59,19 +59,20 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T extends HoodieRecordPaylo
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
     }
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index f6adbbf0d4..b85c7c270f 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -20,9 +20,9 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.Option;
@@ -30,10 +30,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
@@ -43,7 +43,7 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
-public class FlinkMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O>
+public class FlinkMergeHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeHandle<T, I, K, O> {
   private final HoodieCDCLogger cdcLogger;
 
@@ -62,19 +62,20 @@ public class FlinkMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K,
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+      throws IOException {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, combineRecordOp);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() : indexedRecord);
+      cdcLogger.put(hoodieRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> ((HoodieAvroIndexedRecord) rec).getData()));
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
-    super.writeInsertRecord(hoodieRecord, insertRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
+    super.writeInsertRecord(hoodieRecord);
     if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
-      cdcLogger.put(hoodieRecord, null, insertRecord);
+      cdcLogger.put(hoodieRecord, null, Option.of((GenericRecord) hoodieRecord.getData()));
     }
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
index fbc1c7ec55..9759d84cae 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -39,7 +38,7 @@ public class FlinkWriteHandleFactory {
   /**
    * Returns the write handle factory with given write config.
    */
-  public static <T extends HoodieRecordPayload, I, K, O> Factory<T, I, K, O> getFactory(
+  public static <T, I, K, O> Factory<T, I, K, O> getFactory(
       HoodieTableConfig tableConfig,
       HoodieWriteConfig writeConfig) {
     if (writeConfig.allowDuplicateInserts()) {
@@ -58,7 +57,7 @@ public class FlinkWriteHandleFactory {
   //  Inner Class
   // -------------------------------------------------------------------------
 
-  public interface Factory<T extends HoodieRecordPayload, I, K, O> {
+  public interface Factory<T, I, K, O> {
     /**
      * Get or create a new write handle in order to reuse the file handles.
      *
@@ -87,7 +86,7 @@ public class FlinkWriteHandleFactory {
    * Base clazz for commit write handle factory,
    * it encapsulates the handle switching logic: INSERT OR UPSERT.
    */
-  private abstract static class BaseCommitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+  private abstract static class BaseCommitWriteHandleFactory<T, I, K, O> implements Factory<T, I, K, O> {
     @Override
     public HoodieWriteHandle<?, ?, ?, ?> create(
         Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
@@ -140,12 +139,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for commit.
    */
-  private static class CommitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
+  private static class CommitWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final CommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = new CommitWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> CommitWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> CommitWriteHandleFactory<T, I, K, O> getInstance() {
       return (CommitWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -178,12 +177,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for inline clustering.
    */
-  private static class ClusterWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
+  private static class ClusterWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final ClusterWriteHandleFactory<?, ?, ?, ?> INSTANCE = new ClusterWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> ClusterWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> ClusterWriteHandleFactory<T, I, K, O> getInstance() {
       return (ClusterWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -216,12 +215,12 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for commit, the write handle supports logging change logs.
    */
-  private static class CdcWriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
+  private static class CdcWriteHandleFactory<T, I, K, O>
       extends BaseCommitWriteHandleFactory<T, I, K, O> {
     private static final CdcWriteHandleFactory<?, ?, ?, ?> INSTANCE = new CdcWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> CdcWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> CdcWriteHandleFactory<T, I, K, O> getInstance() {
       return (CdcWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
@@ -254,11 +253,11 @@ public class FlinkWriteHandleFactory {
   /**
    * Write handle factory for delta commit.
    */
-  private static class DeltaCommitWriteHandleFactory<T extends HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+  private static class DeltaCommitWriteHandleFactory<T, I, K, O> implements Factory<T, I, K, O> {
     private static final DeltaCommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = new DeltaCommitWriteHandleFactory<>();
 
     @SuppressWarnings("unchecked")
-    public static <T extends HoodieRecordPayload, I, K, O> DeltaCommitWriteHandleFactory<T, I, K, O> getInstance() {
+    public static <T, I, K, O> DeltaCommitWriteHandleFactory<T, I, K, O> getInstance() {
       return (DeltaCommitWriteHandleFactory<T, I, K, O>) INSTANCE;
     }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index ee2cf7a864..4993c43608 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 255b0c612c..7edbb55c3e 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 4ad568a591..4c73012d2c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -197,7 +197,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         case ZORDER:
         case HILBERT:
           return isRowPartitioner
-              ? new RowSpatialCurveSortPartitioner(getWriteConfig(), recordType)
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
               : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
                   getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);
         case LINEAR:
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 51718a2e94..5ba5c5dfdf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -43,8 +43,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Properties;
 
 /**
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 296abaf4f5..8d6787b2d5 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -114,7 +114,7 @@ object HoodieDatasetBulkInsertHelper extends Logging {
    */
   def bulkInsert(dataset: Dataset[Row],
                  instantTime: String,
-                 table: HoodieTable[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]], _, _, _],
+                 table: HoodieTable[_, _, _, _],
                  writeConfig: HoodieWriteConfig,
                  partitioner: BulkInsertPartitioner[Dataset[Row]],
                  parallelism: Int,
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index b03e559603..c0a0307c3e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -446,11 +446,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
    */
   private void testDeduplication(
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
-    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
-        .combineInput(true, true);
-    addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
-    HoodieWriteConfig writeConfig = configBuilder.build();
-
     String newCommitTime = "001";
 
     String recordKey = UUID.randomUUID().toString();
@@ -478,7 +473,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     when(index.isGlobal()).thenReturn(true);
     HoodieRecordMerger recordMerger = HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
     int dedupParallelism = records.getNumPartitions() + 100;
-    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), , writeConfig.getProps(), merge);
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+        HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
     assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index f194ddf8f4..f524141552 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -19,9 +19,12 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
@@ -59,7 +62,8 @@ public class HoodieCDCLogRecordIterator implements ClosableIterator<IndexedRecor
     if (itr == null || !itr.hasNext()) {
       if (reader.hasNext()) {
         HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next();
-        itr = dataBlock.getRecordIterator();
+        // TODO support cdc with spark record.
+        itr = new MappingIterator(dataBlock.getRecordIterator(HoodieRecordType.AVRO), record -> ((HoodieAvroIndexedRecord) record).getData());
         return itr.hasNext();
       }
       return false;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 9c6135dd28..17dc03adec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.fs.SizeAwareDataInputStream;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.MappingIterator;
 import org.apache.hudi.common.util.Option;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index cc5663262c..a373a5ceb5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -19,10 +19,13 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockContentLocation;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.Option;
 
 import java.util.HashMap;
@@ -46,7 +49,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
         Option.of(readerSchema), header, new HashMap<>(), keyField, null);
   }
 
-  public HoodieCDCDataBlock(List<IndexedRecord> records,
+  public HoodieCDCDataBlock(List<HoodieRecord> records,
                             Map<HeaderMetadataType, String> header,
                             String keyField) {
     super(records, header, keyField);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 18437b5650..ba63b982f3 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -433,7 +433,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
       Properties props = new Properties();
       config.addAllToProperties(props);
-      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, props, this.writeClient.getConfig().getSchema(), recordMerger);
+      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
     }
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index b8d4802d51..5364f6d96b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -792,8 +792,8 @@ public class MergeOnReadInputFormat
               continue;
             }
             IndexedRecord avroRecord = avroProjection.isPresent()
-                ? avroProjection.get().apply(mergedAvroRecord.get())
-                : mergedAvroRecord.get();
+                ? avroProjection.get().apply(mergedAvroRecord.get().getData())
+                : mergedAvroRecord.get().getData();
             this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
             return false;
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 87e04e8fd2..f18ef4fc13 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -295,7 +295,8 @@ object HoodieSparkSqlWriter {
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
+              writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
             }
             // Create a HoodieWriteClient & issue the write.
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 07a0ce7f23..87758ebd3c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -21,33 +21,35 @@ package org.apache.hudi
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection}
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
-import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieEmptyRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.config.HoodiePayloadConfig
+import org.apache.hudi.commmon.model.HoodieSparkRecord
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hudi.LogFileIterator._
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
-import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.engine.{EngineType, HoodieLocalEngineContext}
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.common.util.{HoodieRecordUtils, SerializationUtils}
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
 import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
-
-import org.apache.avro.Schema
+import org.apache.avro.{Schema, SchemaNormalization}
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
-
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.hudi.SparkStructTypeSerializer
+import org.apache.spark.sql.HoodieCatalystExpressionUtils
 import org.apache.spark.sql.types.StructType
-
 import java.io.Closeable
+import java.nio.charset.StandardCharsets
 import java.util.Properties
-
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -77,10 +79,13 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   protected override val structTypeSchema: StructType = requiredSchema.structTypeSchema
 
   protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
+  protected val logFileReaderStructType: StructType = tableSchema.structTypeSchema
 
   protected var recordToLoad: InternalRow = _
 
-  private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+  protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
+
+  protected val requiredSchemaSafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
 
   // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
   private var logScanner = {
@@ -91,21 +96,20 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
 
   private val logRecords = logScanner.getRecords.asScala
 
-  def logRecordsIterator(): Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = {
-    logRecords.iterator.asInstanceOf[Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])]]
+  def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
+    logRecords.iterator
   }
 
   // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
   //       going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
-  protected lazy val genericRecordsIterator: Iterator[Option[GenericRecord]] =
-  logRecords.iterator.map {
-    case (_, record) =>
-      toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps))
-        .map(_.asInstanceOf[GenericRecord])
+  protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = logRecords.iterator.map {
+      case (_, record: HoodieSparkRecord) => Option(record)
+      case (_, _: HoodieEmptyRecord[_]) => Option.empty
+      case (_, record) =>
+        toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, payloadProps))
   }
 
-  protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
-    logRecords.remove(key)
+  protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = logRecords.remove(key)
 
   override def hasNext: Boolean = hasNextInternal
 
@@ -113,15 +117,16 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
   //       that recursion is unfolded into a loop to avoid stack overflows while
   //       handling records
   @tailrec private def hasNextInternal: Boolean = {
-    genericRecordsIterator.hasNext && {
-      val avroRecordOpt = genericRecordsIterator.next()
-      if (avroRecordOpt.isEmpty) {
-        // Record has been deleted, skipping
-        this.hasNextInternal
-      } else {
-        val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get)
-        recordToLoad = deserialize(projectedAvroRecord)
-        true
+    logRecordsIterator.hasNext && {
+      logRecordsIterator.next() match {
+        case Some(r: HoodieAvroIndexedRecord) =>
+          val projectedAvroRecord = requiredSchemaSafeAvroProjection(r.getData.asInstanceOf[GenericRecord])
+          recordToLoad = deserialize(projectedAvroRecord)
+          true
+        case Some(r: HoodieSparkRecord) =>
+          recordToLoad = requiredSchemaSafeRowProjection(r.getData)
+          true
+        case None => this.hasNextInternal
       }
     }
   }
@@ -196,6 +201,10 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
 
   private val baseFileIterator = baseFileReader(split.dataFile.get)
 
+  private val mergerList = tableState.mergerImpls.split(",")
+    .map(_.trim).distinct.toList.asJava
+  private val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+
   override def hasNext: Boolean = hasNextInternal
 
   // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure
@@ -211,14 +220,12 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
         recordToLoad = requiredSchemaUnsafeProjection(curRow)
         true
       } else {
-        val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get)
-        if (mergedAvroRecordOpt.isEmpty) {
+       val mergedRecordOpt = merge(curRow, updatedRecordOpt.get)
+        if (mergedRecordOpt.isEmpty) {
           // Record has been deleted, skipping
           this.hasNextInternal
         } else {
-          val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord],
-            avroSchema, reusableRecordBuilder)
-          recordToLoad = deserialize(projectedAvroRecord)
+          recordToLoad = mergedRecordOpt.get
           true
         }
       }
@@ -230,10 +237,22 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
   private def serialize(curRowRecord: InternalRow): GenericRecord =
     serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-  private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+  private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): Option[InternalRow] = {
     // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
     //       on the record from the Delta Log
-    toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps))
+    recordMerger.getRecordType match {
+      case HoodieRecordType.SPARK =>
+        val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
+        toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
+          .map(r => {
+            val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(r.asInstanceOf[HoodieSparkRecord].getStructType, structTypeSchema)
+            projection.apply(r.getData.asInstanceOf[InternalRow])
+          })
+      case _ =>
+        val curRecord = new HoodieAvroIndexedRecord(serialize(curRow))
+        toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
+        .map(r => deserialize(projectAvroUnsafe(r.toIndexedRecord(logFileReaderAvroSchema, new Properties()).get().getData.asInstanceOf[GenericRecord], avroSchema, reusableRecordBuilder)))
+    }
   }
 }
 
@@ -302,6 +321,15 @@ object LogFileIterator {
           getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
       }
 
+      val mergerList = tableState.mergerImpls.split(",")
+        .map(_.trim).distinct.toList.asJava
+      val recordMerger = HoodieRecordUtils.generateRecordMerger(tableState.tablePath, EngineType.SPARK, mergerList, tableState.mergerStrategy)
+      logRecordScannerBuilder.withRecordMerger(recordMerger)
+
+      if (recordMerger.getRecordType == HoodieRecordType.SPARK) {
+        registerStructTypeSerializerIfNeed(List(HoodieInternalRowUtils.getCachedSchema(logSchema)))
+      }
+
       logRecordScannerBuilder.build()
     }
   }
@@ -320,4 +348,11 @@ object LogFileIterator {
       .getOrElse(split.logFiles.head.getPath)
       .getParent
   }
+
+  private def registerStructTypeSerializerIfNeed(schemas: List[StructType]): Unit = {
+    val schemaMap = schemas.map(schema => (SchemaNormalization.fingerprint64(schema.json.getBytes(StandardCharsets.UTF_8)), schema))
+      .toMap
+    val serializer = new SparkStructTypeSerializer(schemaMap)
+    SerializationUtils.setOverallRegister(classOf[HoodieSparkRecord].getName, serializer)
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 598059c030..ad60f6187b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -24,12 +24,13 @@ import org.apache.hudi.HoodieConversionUtils._
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -151,7 +152,7 @@ class HoodieCDCRDD(
     private lazy val tableState = {
       val metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(props)
-        .build();
+        .build()
       HoodieTableState(
         pathToString(basePath),
         split.changes.last.getInstant,
@@ -159,7 +160,10 @@ class HoodieCDCRDD(
         preCombineFieldOpt,
         usesVirtualKeys = false,
         metaClient.getTableConfig.getPayloadClass,
-        metadataConfig
+        metadataConfig,
+        // TODO support CDC with spark record
+        mergerImpls = classOf[HoodieAvroRecordMerger].getName,
+        mergerStrategy = StringUtils.DEFAULT_MERGER_STRATEGY_UUID
       )
     }
 
@@ -217,7 +221,7 @@ class HoodieCDCRDD(
      * Only one case where it will be used is that extract the change data from log files for mor table.
      * At the time, 'logRecordIter' will work with [[beforeImageRecords]] that keep all the records of the previous file slice.
      */
-    private var logRecordIter: Iterator[(String, HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])] = Iterator.empty
+    private var logRecordIter: Iterator[(String, HoodieRecord[_])] = Iterator.empty
 
     /**
      * Only one case where it will be used is that extract the change data from cdc log files.
@@ -434,7 +438,7 @@ class HoodieCDCRDD(
             val absLogPath = new Path(basePath, currentChangeFile.getCdcFile)
             val morSplit = HoodieMergeOnReadFileSplit(None, List(new HoodieLogFile(fs.getFileStatus(absLogPath))))
             val logFileIterator = new LogFileIterator(morSplit, originTableSchema, originTableSchema, tableState, conf)
-            logRecordIter = logFileIterator.logRecordsIterator()
+            logRecordIter = logFileIterator.logRecordsPairIterator
           case AS_IS =>
             assert(currentChangeFile.getCdcFile != null)
             // load beforeFileSlice to beforeImageRecords
@@ -600,9 +604,9 @@ class HoodieCDCRDD(
     }
 
     private def getInsertValue(
-        record: HoodieRecord[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]]])
+        record: HoodieRecord[_])
     : Option[IndexedRecord] = {
-      toScalaOption(record.getData.getInsertValue(avroSchema, payloadProps))
+      toScalaOption(record.toIndexedRecord(avroSchema, payloadProps)).map(_.getData)
     }
 
     private def projectAvroUnsafe(record: IndexedRecord): GenericRecord = {
@@ -610,8 +614,8 @@ class HoodieCDCRDD(
         avroSchema, reusableRecordBuilder)
     }
 
-    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): IndexedRecord = {
-      newRecord.getData.combineAndGetUpdateValue(curAvroRecord, avroSchema, payloadProps).get()
+    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_]): IndexedRecord = {
+      newRecord.getData.asInstanceOf[HoodieRecordPayload[_]].combineAndGetUpdateValue(curAvroRecord, avroSchema, payloadProps).get()
     }
 
     private def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection =
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bdf2da7848..37d320e192 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -72,10 +72,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
     HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
   )
-  val sparkOpts = Map(
-    HoodieWriteConfig.MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName,
-    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
-  )
+  val sparkOpts = Map(HoodieWriteConfig.MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName)
 
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index 5f447a9bce..c15123d55a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -20,7 +20,7 @@ package org.apache.hudi.functional.cdc
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.log.HoodieLogFormat
@@ -29,17 +29,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.testutils.RawTripTestPayload
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
 import org.apache.hudi.testutils.HoodieClientTestBase
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
-
 import org.apache.hadoop.fs.Path
-
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.spark.sql.{DataFrame, SparkSession}
-
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull, assertTrue}
-
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -122,8 +118,8 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase {
     val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema);
     assertTrue(reader.hasNext);
 
-    val block = reader.next().asInstanceOf[HoodieDataBlock];
-    block.getRecordIterator.asScala.toList
+    val block = reader.next().asInstanceOf[HoodieDataBlock]
+    block.getRecordIterator[IndexedRecord](HoodieRecordType.AVRO).asScala.toList.map(_.getData)
   }
 
   protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
diff --git a/rfc/rfc-46/rfc-46.md b/rfc/rfc-46/rfc-46.md
index a851a4443a..192bdbf8c6 100644
--- a/rfc/rfc-46/rfc-46.md
+++ b/rfc/rfc-46/rfc-46.md
@@ -38,7 +38,7 @@ when dealing with records (during merge, column value extractions, writing into
 
 While having a single format of the record representation is certainly making implementation of some components simpler, 
 it bears unavoidable performance penalty of de-/serialization loop: every record handled by Hudi has to be converted
-from (low-level) engine-specific representation (`Row` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate 
+from (low-level) engine-specific representation (`InternalRow` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate 
 one (Avro), with some operations (like clustering, compaction) potentially incurring this penalty multiple times (on read- 
 and write-paths). 
 
@@ -84,59 +84,105 @@ is known to have poor performance (compared to non-reflection based instantiatio
 
 #### Record Merge API
 
-Stateless component interface providing for API Combining Records will look like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:
 
 ```java
-interface HoodieMerge {
-   HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
-
-   Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
-}
+interface HoodieRecordMerger {
 
    /**
-    * Spark-specific implementation 
+    * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy.
     */
-   class HoodieSparkRecordMerge implements HoodieMerge {
+   String getMergingStrategy();
+  
+   // This method converges combineAndGetUpdateValue and precombine from HoodiePayload. 
+   // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result)
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+   
+   // The record type handled by the current merger
+   // SPARK, AVRO, FLINK
+   HoodieRecordType getRecordType();
+}
 
-      @Override
-      public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
-        // HoodieSparkRecords preCombine
-      }
+/**
+ * Spark-specific implementation 
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
+
+  @Override
+  public String getMergingStrategy() {
+    return UUID_MERGER_STRATEGY;
+  }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+     // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
 
-      @Override
-      public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) {
-         // HoodieSparkRecord combineAndGetUpdateValue
-      }
+   @Override
+   HoodieRecordType getRecordType() {
+     return HoodieRecordType.SPARK;
    }
+}
    
-   /**
-    * Flink-specific implementation 
-    */
-   class HoodieFlinkRecordMerge implements HoodieMerge {
-
-      @Override
-      public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
-        // HoodieFlinkRecord preCombine
-      }
+/**
+ * Flink-specific implementation 
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;
+   }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+      // HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
 
-      @Override
-      public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) {
-         // HoodieFlinkRecord combineAndGetUpdateValue
-      }
+   @Override
+   HoodieRecordType getRecordType() {
+      return HoodieRecordType.FLINK;
    }
+}
 ```
 Where user can provide their own subclass implementing such interface for the engines of interest.
 
-#### Migration from `HoodieRecordPayload` to `HoodieMerge`
+#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`
 
 To warrant backward-compatibility (BWC) on the code-level with already created subclasses of `HoodieRecordPayload` currently
-already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieMerge`, that will 
+already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that will 
 be using user-defined subclass of `HoodieRecordPayload` to combine the records.
 
-Leveraging such bridge will make provide for seamless BWC migration to the 0.11 release, however will be removing the performance 
+Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance 
 benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize
 full-suite of benefits of this refactoring, users will have to migrate their merging logic out of `HoodieRecordPayload` subclass and into
-new `HoodieMerge` implementation.
+new `HoodieRecordMerger` implementation.
+
+Precombine is used to merge records from logs or incoming records; CombineAndGetUpdateValue is used to merge record from log file and record from base file.
+these two merge logics are unified in HoodieAvroRecordMerger as merge function. `HoodieAvroRecordMerger`'s API will look like following:
+
+```java
+/**
+ * Backward compatibility HoodieRecordPayload implementation 
+ */
+class HoodieAvroRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;
+   }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+      // HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
+
+   @Override
+   HoodieRecordType getRecordType() {
+      return HoodieRecordType.AVRO;
+   }
+}
+```
 
 ### Refactoring Flows Directly Interacting w/ Records:
 
@@ -156,13 +202,66 @@ Following major components will be refactored:
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different data(avro, InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   Object getRecordColumnValues(Schema recordSchema, String[] columns,
+           boolean consistentLogicalTimestampEnabled);
+
+   /**
+    * Support bootstrap.
+    */
+   HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
+
+   /**
+    * Rewrite record into new schema(add meta columns)
+    */
+   HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema)
+           throws IOException;
+
+   /**
+    * Support schema evolution.
+    */
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema,
+           Map<String, String> renameCols) throws IOException;
+
+   HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException;
+
+   HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
+           MetadataValues metadataValues) throws IOException;
+
+   boolean isDelete(Schema recordSchema, Properties props) throws IOException;
+
+   /**
+    * Is EmptyRecord. Generated by ExpressionPayload.
+    */
+   boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException;
+
+   Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props)
+           throws IOException;
+
+   // Other functions with getter or setter ...
+}
+```
 
 ## Rollout/Adoption Plan
 
  - What impact (if any) will there be on existing users? 
    - Users of the Hudi will observe considerably better performance for most of the routine operations: writing, reading, compaction, clustering, etc due to avoiding the superfluous intermediate de-/serialization penalty
    - By default, modified hierarchy would still leverage 
-   - Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieMerge` to get full-suite of performance benefits 
+   - Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieRecordMerger` to get full-suite of performance benefits 
  - If we are changing behavior how will we phase out the older behavior?
    - Older behavior leveraging `HoodieRecordPayload` for merging will be marked as deprecated in 0.11, and subsequently removed in 0.1x
  - If we need special migration tools, describe them here.