You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/23 10:36:59 UTC

[hudi] branch master updated: [HUDI-4897] Refactor the merge handle in CDC mode (#6740)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new efe553b327 [HUDI-4897] Refactor the merge handle in CDC mode (#6740)
efe553b327 is described below

commit efe553b327bc025d242afa37221a740dca9b1ea6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Sep 23 18:36:48 2022 +0800

    [HUDI-4897] Refactor the merge handle in CDC mode (#6740)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   9 ++
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   | 148 ++++++++++++---------
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  56 ++------
 .../apache/hudi/io/HoodieMergeHandleFactory.java   | 101 ++++++++++++++
 .../hudi/io/HoodieMergeHandleWithChangeLog.java    |  97 ++++++++++++++
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |  23 +---
 .../io/HoodieSortedMergeHandleWithChangeLog.java   |  59 ++++++++
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  11 ++
 .../hudi/table/action/clean/CleanPlanner.java      |  22 +--
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  11 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  11 +-
 .../commit/BaseJavaCommitActionExecutor.java       |  20 +--
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  11 +-
 .../commit/BaseSparkCommitActionExecutor.java      |  14 +-
 .../hudi/common/table/cdc/HoodieCDCOperation.java  |   3 +
 .../cdc/HoodieCDCSupplementalLoggingMode.java      |  12 ++
 .../hudi/common/table/cdc/HoodieCDCUtils.java      |   9 +-
 .../common/table/log/block/HoodieCDCDataBlock.java |   4 +-
 18 files changed, 420 insertions(+), 201 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7f805410ea..032d790d63 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -2160,6 +2161,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return metastoreConfig.enableMetastore();
   }
 
+  /**
+   * CDC supplemental logging mode.
+   */
+  public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
+    return HoodieCDCSupplementalLoggingMode.parse(
+        getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index e4f1e14252..f57b195c76 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -18,14 +18,6 @@
 
 package org.apache.hudi.io;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -48,6 +40,13 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -67,13 +66,9 @@ public class HoodieCDCLogger implements Closeable {
 
   private final Schema dataSchema;
 
-  private final boolean populateMetaFields;
-
   // writer for cdc data
   private final HoodieLogFormat.Writer cdcWriter;
 
-  private final boolean cdcEnabled;
-
   private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
 
   private final Schema cdcSchema;
@@ -83,6 +78,9 @@ public class HoodieCDCLogger implements Closeable {
   // the cdc data
   private final Map<String, HoodieAvroPayload> cdcData;
 
+  // the cdc record transformer
+  private final CDCTransformer transformer;
+
   public HoodieCDCLogger(
       String commitTime,
       HoodieWriteConfig config,
@@ -93,15 +91,11 @@ public class HoodieCDCLogger implements Closeable {
     try {
       this.commitTime = commitTime;
       this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
-      this.populateMetaFields = config.populateMetaFields();
-      this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+      this.keyField = config.populateMetaFields()
+          ? HoodieRecord.RECORD_KEY_METADATA_FIELD
           : tableConfig.getRecordKeyFieldProp();
       this.cdcWriter = cdcWriter;
-
-      this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
-      this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
-          config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
-
+      this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode();
       this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
           cdcSupplementalLoggingMode,
           dataSchema
@@ -114,8 +108,8 @@ public class HoodieCDCLogger implements Closeable {
           new DefaultSizeEstimator<>(),
           new DefaultSizeEstimator<>(),
           config.getCommonConfig().getSpillableDiskMapType(),
-          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
-      );
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
+      this.transformer = getTransformer();
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
     }
@@ -124,55 +118,25 @@ public class HoodieCDCLogger implements Closeable {
   public void put(HoodieRecord hoodieRecord,
                   GenericRecord oldRecord,
                   Option<IndexedRecord> newRecord) {
-    if (cdcEnabled) {
-      String recordKey = hoodieRecord.getRecordKey();
-      GenericData.Record cdcRecord;
-      if (newRecord.isPresent()) {
-        GenericRecord record = (GenericRecord) newRecord.get();
-        if (oldRecord == null) {
-          // inserted cdc record
-          cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
-              null, record);
-        } else {
-          // updated cdc record
-          cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
-              oldRecord, record);
-        }
+    String recordKey = hoodieRecord.getRecordKey();
+    GenericData.Record cdcRecord;
+    if (newRecord.isPresent()) {
+      GenericRecord record = (GenericRecord) newRecord.get();
+      if (oldRecord == null) {
+        // INSERT cdc record
+        cdcRecord = this.transformer.transform(HoodieCDCOperation.INSERT, recordKey,
+            null, record);
       } else {
-        // deleted cdc record
-        cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
-            oldRecord, null);
+        // UPDATE cdc record
+        cdcRecord = this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey,
+            oldRecord, record);
       }
-      cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
-    }
-  }
-
-  private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
-                                             String recordKey,
-                                             GenericRecord oldRecord,
-                                             GenericRecord newRecord) {
-    GenericData.Record record;
-    if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
-      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime,
-          removeCommitMetadata(oldRecord), newRecord);
-    } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
-      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey,
-          removeCommitMetadata(oldRecord));
     } else {
-      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
-    }
-    return record;
-  }
-
-  private GenericRecord removeCommitMetadata(GenericRecord record) {
-    if (record == null) {
-      return null;
+      // DELETE cdc record
+      cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
+          oldRecord, null);
     }
-    return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
-  }
-
-  public boolean isEmpty() {
-    return !this.cdcEnabled || this.cdcData.isEmpty();
+    cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
   }
 
   public Option<AppendResult> writeCDCData() {
@@ -219,6 +183,43 @@ public class HoodieCDCLogger implements Closeable {
     }
   }
 
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private CDCTransformer getTransformer() {
+    if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+      return (operation, recordKey, oldRecord, newRecord) ->
+          HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
+    } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
+      return (operation, recordKey, oldRecord, newRecord) ->
+          HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
+    } else {
+      return (operation, recordKey, oldRecord, newRecord) ->
+          HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
+    }
+  }
+
+  private GenericRecord removeCommitMetadata(GenericRecord record) {
+    return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap());
+  }
+
+  public boolean isEmpty() {
+    return this.cdcData.isEmpty();
+  }
+
+  public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
+                                                          long recordsWritten,
+                                                          long insertRecordsWritten) {
+    if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
+      // case 2: all the incoming data is INSERT.
+      return Option.empty();
+    }
+    return cdcLogger.writeCDCData();
+  }
+
   public static void setCDCStatIfNeeded(HoodieWriteStat stat,
                                         Option<AppendResult> cdcResult,
                                         String partitionPath,
@@ -236,4 +237,19 @@ public class HoodieCDCLogger implements Closeable {
       throw new HoodieUpsertException("Failed to set cdc write stat", e);
     }
   }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * A transformer that transforms normal data records into cdc records.
+   */
+  private interface CDCTransformer {
+    GenericData.Record transform(HoodieCDCOperation operation,
+                                 String recordKey,
+                                 GenericRecord oldRecord,
+                                 GenericRecord newRecord);
+
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 442256ade3..5515c2552e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
@@ -33,9 +31,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
@@ -55,7 +50,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -66,8 +61,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 @SuppressWarnings("Duplicates")
@@ -107,8 +102,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
-  protected boolean cdcEnabled = false;
-  protected HoodieCDCLogger cdcLogger;
   private boolean preserveMetadata = false;
 
   protected Path newFilePath;
@@ -210,18 +203,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       // Create the writer for writing the new version file
       fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
         writeSchemaWithMetaFields, taskContextSupplier);
-
-      // init the cdc logger
-      this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
-      if (cdcEnabled) {
-        this.cdcLogger = new HoodieCDCLogger(
-            instantTime,
-            config,
-            hoodieTable.getMetaClient().getTableConfig(),
-            tableSchema,
-            createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
-            IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
-      }
     } catch (IOException io) {
       LOG.error("Error in update task at commit " + instantTime, io);
       writeStatus.setGlobalError(io);
@@ -287,7 +268,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
     boolean isDelete = false;
     if (indexedRecord.isPresent()) {
       updatedRecordsWritten++;
@@ -300,11 +281,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
         return false;
       }
     }
-    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
-    if (result && cdcEnabled) {
-      cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
-    }
-    return result;
+    return writeRecord(hoodieRecord, indexedRecord, isDelete);
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -314,10 +291,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
       return;
     }
+    writeInsertRecord(hoodieRecord, insertRecord);
+  }
+
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
     if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
-      if (cdcEnabled) {
-        cdcLogger.put(hoodieRecord, null, insertRecord);
-      }
       insertRecordsWritten++;
     }
   }
@@ -425,21 +403,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     }
   }
 
-  private Option<AppendResult> writeCDCDataIfNeeded() {
-    if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
-      // the following cases where we do not need to write out the cdc file:
-      // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
-      // case 2: all the data are new-coming,
-      return Option.empty();
-    }
-    return cdcLogger.writeCDCData();
-  }
-
   @Override
   public List<WriteStatus> close() {
     try {
-      HoodieWriteStat stat = writeStatus.getStat();
-
       writeIncomingRecords();
 
       if (keyToNewRecords instanceof ExternalSpillableMap) {
@@ -454,11 +420,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
         fileWriter = null;
       }
 
-      // if there are cdc data written, set the CDC-related information.
-      Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
-      HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);
-
       long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
+      HoodieWriteStat stat = writeStatus.getStat();
+
       stat.setTotalWriteBytes(fileSizeInBytes);
       stat.setFileSizeInBytes(fileSizeInBytes);
       stat.setNumWrites(recordsWritten);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
new file mode 100644
index 0000000000..436eff5dac
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Factory class for hoodie merge handle.
+ */
+public class HoodieMergeHandleFactory {
+  /**
+   * Creates a merge handle for normal write path.
+   */
+  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+      WriteOperationType operationType,
+      HoodieWriteConfig writeConfig,
+      String instantTime,
+      HoodieTable<T, I, K, O> table,
+      Iterator<HoodieRecord<T>> recordItr,
+      String partitionPath,
+      String fileId,
+      TaskContextSupplier taskContextSupplier,
+      Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (table.requireSortedRecords()) {
+      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+        return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
+            keyGeneratorOpt);
+      } else {
+        return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
+            keyGeneratorOpt);
+      }
+    } else if (!WriteOperationType.isChangingRecords(operationType) && writeConfig.allowDuplicateInserts()) {
+      return new HoodieConcatHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+    } else {
+      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+        return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+      } else {
+        return new HoodieMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+      }
+    }
+  }
+
+  /**
+   * Creates a merge handle for compaction path.
+   */
+  public static <T extends HoodieRecordPayload, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+      HoodieWriteConfig writeConfig,
+      String instantTime,
+      HoodieTable<T, I, K, O> table,
+      Map<String, HoodieRecord<T>> keyToNewRecords,
+      String partitionPath,
+      String fileId,
+      HoodieBaseFile dataFileToBeMerged,
+      TaskContextSupplier taskContextSupplier,
+      Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (table.requireSortedRecords()) {
+      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+        return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+            dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      } else {
+        return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+            dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      }
+    } else {
+      if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+        return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+            dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      } else {
+        return new HoodieMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
+            dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+      }
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
new file mode 100644
index 0000000000..12e48ffbb4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A merge handle that supports logging change logs.
+ */
+public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+  protected final HoodieCDCLogger cdcLogger;
+
+  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                        Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                                        TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+    this.cdcLogger = new HoodieCDCLogger(
+        instantTime,
+        config,
+        hoodieTable.getMetaClient().getTableConfig(),
+        tableSchema,
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                        Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+                                        HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+    this.cdcLogger = new HoodieCDCLogger(
+        instantTime,
+        config,
+        hoodieTable.getMetaClient().getTableConfig(),
+        tableSchema,
+        createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+        IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
+  }
+
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, indexedRecord);
+    if (result) {
+      cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+    }
+    return result;
+  }
+
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
+    super.writeInsertRecord(hoodieRecord, insertRecord);
+    cdcLogger.put(hoodieRecord, null, insertRecord);
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatuses = super.close();
+    // if there are cdc data written, set the CDC-related information.
+    Option<AppendResult> cdcResult =
+        HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
+    HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
+    return writeStatuses;
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 8240de66d5..7dce31a4c3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -18,9 +18,6 @@
 
 package org.apache.hudi.io;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -33,6 +30,8 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.generic.GenericRecord;
+
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
@@ -94,18 +93,13 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
         throw new HoodieUpsertException("Insert/Update not in sorted order");
       }
       try {
-        Option<IndexedRecord> insertRecord;
         if (useWriterSchemaForCompaction) {
-          insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
         } else {
-          insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
         }
-        writeRecord(hoodieRecord, insertRecord);
         insertRecordsWritten++;
         writtenRecordKeys.add(keyToPreWrite);
-        if (cdcEnabled) {
-          cdcLogger.put(hoodieRecord, null, insertRecord);
-        }
       } catch (IOException e) {
         throw new HoodieUpsertException("Failed to write records", e);
       }
@@ -122,17 +116,12 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
         String key = newRecordKeysSorted.poll();
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
-          Option<IndexedRecord> insertRecord;
           if (useWriterSchemaForCompaction) {
-            insertRecord = hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps());
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
           } else {
-            insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, config.getProps());
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
           }
-          writeRecord(hoodieRecord, insertRecord);
           insertRecordsWritten++;
-          if (cdcEnabled) {
-            cdcLogger.put(hoodieRecord, null, insertRecord);
-          }
         }
       } catch (IOException e) {
         throw new HoodieUpsertException("Failed to close UpdateHandle", e);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
new file mode 100644
index 0000000000..8d317b709a
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A sorted merge handle that supports logging change logs.
+ */
+public class HoodieSortedMergeHandleWithChangeLog<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandleWithChangeLog<T, I, K, O> {
+  public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                              Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                                              TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                           Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+                           HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+  }
+
+  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> insertRecord) {
+    final boolean result = super.writeRecord(hoodieRecord, insertRecord);
+    this.cdcLogger.put(hoodieRecord, null, insertRecord);
+    return result;
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 3864c31ce9..abf5c0face 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -309,6 +310,16 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
   }
 
+  protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, String fileSuffix) {
+    try {
+      return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
+    } catch (IOException e) {
+      throw new HoodieException("Creating logger writer with fileId: " + fileId + ", "
+          + "base commit time: " + baseCommitTime + ", "
+          + "file suffix: " + fileSuffix + " error");
+    }
+  }
+
   private static class IgnoreRecord implements GenericRecord {
 
     @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 5ed53f7ae0..e5d90b5e9d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -369,22 +369,12 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
                   deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
                 }
               });
-              if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-                // If merge on read, then clean the log files for the commits as well
-                Predicate<HoodieLogFile> notCDCLogFile =
-                    hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
-                deletePaths.addAll(
-                    aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                        .collect(Collectors.toList()));
-              }
-              if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
-                // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow.
-                // Here we need to clean uo these cdc log files.
-                Predicate<HoodieLogFile> isCDCLogFile =
-                    hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX);
-                deletePaths.addAll(
-                    aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                        .collect(Collectors.toList()));
+              if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ
+                  || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+                // 1. If merge on read, then clean the log files for the commits as well;
+                // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow.
+                deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                    .collect(Collectors.toList()));
               }
             }
           }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 2c8a3c4e49..7d2be6cb93 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -46,7 +46,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
@@ -401,13 +401,8 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
             + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
       }
     }
-    if (requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
-    } else {
-      return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
-    }
+    return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 8892133498..8e72682725 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -45,7 +45,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
@@ -299,13 +299,8 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
 
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
                                               Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
-    if (requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, Option.empty());
-    } else {
-      return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, Option.empty());
-    }
+    return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier, Option.empty());
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 22c90eb8bb..7762fd5ea3 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -41,8 +40,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.JavaLazyInsertIterable;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
-import org.apache.hudi.io.HoodieConcatHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -291,20 +289,8 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    if (table.requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
-    } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
-      return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
-    } else {
-      return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, Option.empty());
-    }
-  }
-
-  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
-                                              Map<String, HoodieRecord<T>> keyToNewRecords,
-                                              HoodieBaseFile dataFileToBeMerged) {
-    return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
-        partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty());
+    return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId,
+        taskContextSupplier, Option.empty());
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index a88ca65c35..115aea06f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -48,7 +48,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -250,13 +250,8 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
             + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
       }
     }
-    if (requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
-    } else {
-      return new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
-    }
+    return HoodieMergeHandleFactory.create(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index af72c14efc..8c7d9e41ea 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -45,12 +45,10 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.CreateHandleFactory;
-import org.apache.hudi.io.HoodieConcatHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
-import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -385,14 +383,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    if (table.requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier,
-          keyGeneratorOpt);
-    } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
-      return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
-    } else {
-      return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
-    }
+    return HoodieMergeHandleFactory.create(operationType, config, instantTime, table, recordItr, partitionPath, fileId,
+        taskContextSupplier, keyGeneratorOpt);
   }
 
   @Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
index edd63f3569..90540bc05a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java
@@ -20,6 +20,9 @@ package org.apache.hudi.common.table.cdc;
 
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
+/**
+ * Enumeration of change log operation.
+ */
 public enum HoodieCDCOperation {
   INSERT("i"),
   UPDATE("u"),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index b1e92dd273..35a232206f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -20,6 +20,18 @@ package org.apache.hudi.common.table.cdc;
 
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
+/**
+ * Change log capture supplemental logging mode. The supplemental log is used for
+ * accelerating the generation of change log details.
+ *
+ * <p>Three modes are supported:</p>
+ *
+ * <ul>
+ *   <li>OP_KEY: record keys, the reader needs to figure out the update before image and after image;</li>
+ *   <li>OP_KEY: before images, the reader needs to figure out the update after images;</li>
+ *   <li>OP_KEY: before and after images, the reader can generate the details directly from the log.</li>
+ * </ul>
+ */
 public enum HoodieCDCSupplementalLoggingMode {
   OP_KEY("cdc_op_key"),
   WITH_BEFORE("cdc_data_before"),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index a741181d4d..042e95cfd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -18,17 +18,20 @@
 
 package org.apache.hudi.common.table.cdc;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.exception.HoodieException;
+
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.exception.HoodieException;
-
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Utilities for change log capture.
+ */
 public class HoodieCDCUtils {
 
   public static final String CDC_LOGFILE_SUFFIX = "-cdc";
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 1c0f6e4b6c..cc5663262c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -29,6 +29,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Change log supplemental log data block.
+ */
 public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
 
   public HoodieCDCDataBlock(
@@ -53,5 +56,4 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
   public HoodieLogBlockType getBlockType() {
     return HoodieLogBlockType.CDC_DATA_BLOCK;
   }
-
 }