You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/28 18:23:37 UTC

[hudi] branch master updated: [HUDI-5626] Rename CDC logging mode options (#7760)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c21eca564c6 [HUDI-5626] Rename CDC logging mode options (#7760)
c21eca564c6 is described below

commit c21eca564c6426413cbdc9e83bc40ad7c59c7e5d
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Sat Jan 28 12:23:30 2023 -0600

    [HUDI-5626] Rename CDC logging mode options (#7760)
    
    Change logging mode names for CDC feature to
    
    - op_key_only
    - data_before
    - data_before_after
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  2 +-
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |  7 ++-
 .../hudi/common/table/HoodieTableConfig.java       | 19 ++++----
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |  6 ++-
 .../hudi/common/table/cdc/HoodieCDCInferCase.java  |  4 +-
 .../cdc/HoodieCDCSupplementalLoggingMode.java      | 37 +++-------------
 .../hudi/common/table/cdc/HoodieCDCUtils.java      | 14 +++---
 .../common/functional/TestHoodieLogFormat.java     |  2 +-
 .../apache/hudi/configuration/FlinkOptions.java    | 12 ++---
 .../apache/hudi/configuration/OptionsResolver.java |  2 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |  6 +--
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  2 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  4 +-
 .../scala/org/apache/hudi/cdc/CDCRelation.scala    | 21 ++++-----
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 36 +++++++--------
 .../hudi/functional/cdc/HoodieCDCTestBase.scala    | 32 +++++++-------
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 51 ++++++++++------------
 .../functional/cdc/TestCDCStreamingSuite.scala     | 16 +++----
 .../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 11 +++--
 19 files changed, 130 insertions(+), 154 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b9d7c800250..0f7a9c1cf44 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2364,7 +2364,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * CDC supplemental logging mode.
    */
   public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
-    return HoodieCDCSupplementalLoggingMode.parse(
+    return HoodieCDCSupplementalLoggingMode.valueOf(
         getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index be646df85be..fd2dc60b58b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -58,6 +58,9 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+
 /**
  * This class encapsulates all the cdc-writing functions.
  */
@@ -240,10 +243,10 @@ public class HoodieCDCLogger implements Closeable {
   // -------------------------------------------------------------------------
 
   private CDCTransformer getTransformer() {
-    if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+    if (cdcSupplementalLoggingMode == data_before_after) {
       return (operation, recordKey, oldRecord, newRecord) ->
           HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
-    } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
+    } else if (cdcSupplementalLoggingMode == data_before) {
       return (operation, recordKey, oldRecord, newRecord) ->
           HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
     } else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index e450614e2b6..e34aa1e6dad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -64,6 +64,9 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
 
 /**
  * Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during
@@ -137,15 +140,15 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty
       .key("hoodie.table.cdc.supplemental.logging.mode")
-      .defaultValue(HoodieCDCSupplementalLoggingMode.OP_KEY.getValue())
+      .defaultValue(op_key_only.name())
       .withValidValues(
-          HoodieCDCSupplementalLoggingMode.OP_KEY.getValue(),
-          HoodieCDCSupplementalLoggingMode.WITH_BEFORE.getValue(),
-          HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER.getValue())
+          op_key_only.name(),
+          data_before.name(),
+          data_before_after.name())
       .sinceVersion("0.13.0")
-      .withDocumentation("When 'cdc_op_key' persist the 'op' and the record key only,"
-          + " when 'cdc_data_before' persist the additional 'before' image ,"
-          + " and when 'cdc_data_before_after', persist the 'before' and 'after' at the same time.");
+      .withDocumentation("Setting 'op_key_only' persists the 'op' and the record key only, "
+          + "setting 'data_before' persists the additional 'before' image, "
+          + "and setting 'data_before_after' persists the additional 'before' and 'after' images.");
 
   public static final ConfigProperty<String> CREATE_SCHEMA = ConfigProperty
       .key("hoodie.table.create.schema")
@@ -659,7 +662,7 @@ public class HoodieTableConfig extends HoodieConfig {
   }
 
   public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() {
-    return HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
+    return HoodieCDCSupplementalLoggingMode.valueOf(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
   }
 
   public String getKeyGeneratorClassName() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 994b7ea477e..506680dc3b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -61,6 +61,8 @@ import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELE
 import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT;
 import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE;
 import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
@@ -280,7 +282,7 @@ public class HoodieCDCExtractor {
       }
     } else {
       // this is a cdc log
-      if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+      if (supplementalLoggingMode == data_before_after) {
         cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet());
       } else {
         try {
@@ -292,7 +294,7 @@ public class HoodieCDCExtractor {
           FileSlice beforeFileSlice = null;
           FileSlice currentFileSlice = new FileSlice(fileGroupId, instant.getTimestamp(),
               new HoodieBaseFile(fs.getFileStatus(new Path(basePath, writeStat.getPath()))), new ArrayList<>());
-          if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
+          if (supplementalLoggingMode == op_key_only) {
             beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
           }
           cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
index c6005c60101..dfcb08a84cd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
@@ -24,10 +24,10 @@ package org.apache.hudi.common.table.cdc;
  *
  * AS_IS:
  *   For this type, there must be a real cdc log file from which we get the whole/part change data.
- *   when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}, it keeps all the fields about the
  *   change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly,
  *   no more other files need to be loaded.
- *   when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}, it keeps the `op`, the key and the
  *   `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the
  *   current base/log file as `after`.
  *   when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index 13a51a4f072..b52d1432fc1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.common.table.cdc;
 
-import org.apache.hudi.exception.HoodieNotSupportedException;
-
 /**
  * Change log capture supplemental logging mode. The supplemental log is used for
  * accelerating the generation of change log details.
@@ -27,36 +25,13 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
  * <p>Three modes are supported:</p>
  *
  * <ul>
- *   <li>OP_KEY: record keys, the reader needs to figure out the update before image and after image;</li>
- *   <li>WITH_BEFORE: before images, the reader needs to figure out the update after images;</li>
- *   <li>WITH_BEFORE_AFTER: before and after images, the reader can generate the details directly from the log.</li>
+ *   <li>op_key_only: record keys, the reader needs to figure out the update before image and after image;</li>
+ *   <li>data_before: before images, the reader needs to figure out the update after images;</li>
+ *   <li>data_before_after: before and after images, the reader can generate the details directly from the log.</li>
  * </ul>
  */
 public enum HoodieCDCSupplementalLoggingMode {
-  OP_KEY("cdc_op_key"),
-  WITH_BEFORE("cdc_data_before"),
-  WITH_BEFORE_AFTER("cdc_data_before_after");
-
-  private final String value;
-
-  HoodieCDCSupplementalLoggingMode(String value) {
-    this.value = value;
-  }
-
-  public String getValue() {
-    return this.value;
-  }
-
-  public static HoodieCDCSupplementalLoggingMode parse(String value) {
-    switch (value) {
-      case "cdc_op_key":
-        return OP_KEY;
-      case "cdc_data_before":
-        return WITH_BEFORE;
-      case "cdc_data_before_after":
-        return WITH_BEFORE_AFTER;
-      default:
-        throw new HoodieNotSupportedException("Unsupported value: " + value);
-    }
-  }
+  op_key_only,
+  data_before,
+  data_before_after
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index 6ca5869fdfd..069567208b0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -59,7 +59,7 @@ public class HoodieCDCUtils {
   };
 
   /**
-   * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
+   * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
    */
   public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\","
       + "\"fields\":["
@@ -73,11 +73,11 @@ public class HoodieCDCUtils {
   public static Schema schemaBySupplementalLoggingMode(
       HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
       Schema tableSchema) {
-    if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
+    if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.op_key_only) {
       return CDC_SCHEMA_OP_AND_RECORDKEY;
-    } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
+    } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before) {
       return createCDCSchema(tableSchema, false);
-    } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
+    } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before_after) {
       return createCDCSchema(tableSchema, true);
     } else {
       throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
@@ -109,7 +109,7 @@ public class HoodieCDCUtils {
   }
 
   /**
-   * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}.
    */
   public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime,
                                              GenericRecord before, GenericRecord after) {
@@ -122,7 +122,7 @@ public class HoodieCDCUtils {
   }
 
   /**
-   * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
+   * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}.
    */
   public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
                                              String recordKey, GenericRecord before) {
@@ -134,7 +134,7 @@ public class HoodieCDCUtils {
   }
 
   /**
-   * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
+   * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
    */
   public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) {
     GenericData.Record record = new GenericData.Record(cdcSchema);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index a8828514eeb..9250429b377 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -551,7 +551,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
         + "]}";
     Schema dataSchema = new Schema.Parser().parse(dataSchameString);
     Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
-        HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
+        HoodieCDCSupplementalLoggingMode.data_before_after, dataSchema);
     GenericRecord insertedRecord = new GenericData.Record(dataSchema);
     insertedRecord.put("uuid", 1);
     insertedRecord.put("name", "apple");
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c1812b79e38..63bb0d365a2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -25,8 +25,8 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
 import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
 import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
 import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
@@ -168,12 +169,11 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> SUPPLEMENTAL_LOGGING_MODE = ConfigOptions
       .key("cdc.supplemental.logging.mode")
       .stringType()
-      .defaultValue("cdc_data_before_after") // default record all the change log images
+      .defaultValue(data_before_after.name())
       .withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key())
-      .withDescription("The supplemental logging mode:"
-          + "1) 'cdc_op_key': persist the 'op' and the record key only,"
-          + "2) 'cdc_data_before': persist the additional 'before' image,"
-          + "3) 'cdc_data_before_after': persist the 'before' and 'after' images at the same time");
+      .withDescription("Setting 'op_key_only' persists the 'op' and the record key only, "
+          + "setting 'data_before' persists the additional 'before' image, "
+          + "and setting 'data_before_after' persists the additional 'before' and 'after' images.");
 
   // ------------------------------------------------------------------------
   //  Metadata table Options
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index af3e25ef2c0..42b94b58351 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -210,7 +210,7 @@ public class OptionsResolver {
    */
   public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) {
     String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
-    return HoodieCDCSupplementalLoggingMode.parse(mode);
+    return HoodieCDCSupplementalLoggingMode.valueOf(mode);
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 4e162d8e2b8..8474c2a797a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -173,11 +173,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableState.getAvroSchema()));
         Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
         switch (mode) {
-          case WITH_BEFORE_AFTER:
+          case data_before_after:
             return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit);
-          case WITH_BEFORE:
+          case data_before:
             return new BeforeImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
-          case OP_KEY:
+          case op_key_only:
             return new RecordKeyImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
           default:
             throw new AssertionError("Unexpected mode" + mode);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 10f2bcd095a..d733965b2eb 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -162,7 +162,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.CDC_ENABLED, true)
-        .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue())
+        .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 6d0bf731ccc..7563498bbb6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -324,7 +324,7 @@ public class TestInputFormat {
   void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.CDC_ENABLED.key(), "true");
-    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
     beforeEach(HoodieTableType.COPY_ON_WRITE, options);
 
     // write the insert data sets
@@ -365,7 +365,7 @@ public class TestInputFormat {
   void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.CDC_ENABLED.key(), "true");
-    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+    options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
     options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
     beforeEach(HoodieTableType.COPY_ON_WRITE, options);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index 4e90303b3b2..5b12a2ab218 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -18,24 +18,21 @@
 
 package org.apache.hudi.cdc
 
-import org.apache.hudi.AvroConversionUtils
-import org.apache.hudi.DataSourceReadOptions
-import org.apache.hudi.HoodieDataSourceHelper
-import org.apache.hudi.HoodieTableSchema
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.TableSchemaResolver
 import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
 import org.apache.hudi.common.table.log.InstantRange
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
@@ -131,7 +128,7 @@ object CDCRelation {
 
   /**
    * CDC Schema For Spark.
-   * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before_after`.
+   * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before_after]].
    * Here we use the debezium format.
    */
   val FULL_CDC_SPARK_SCHEMA: StructType = {
@@ -146,7 +143,7 @@ object CDCRelation {
   }
 
   /**
-   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `op_key`.
+   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[op_key_only]].
    */
   val MIN_CDC_SPARK_SCHEMA: StructType = {
     StructType(
@@ -158,7 +155,7 @@ object CDCRelation {
   }
 
   /**
-   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before`.
+   * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before]].
    */
   val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = {
     StructType(
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 4768e4a3d87..767300f3659 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -18,40 +18,40 @@
 
 package org.apache.hudi.cdc
 
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
 import org.apache.hudi.HoodieConversionUtils._
 import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
+import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
 
 import java.io.Closeable
 import java.util.Properties
@@ -244,7 +244,7 @@ class HoodieCDCRDD(
 
     /**
      * Keep the after-image data. Only one case will use this:
-     * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 'op_key' or 'cdc_data_before'.
+     * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[op_key_only]] or [[data_before]].
      */
     private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty
 
@@ -306,13 +306,13 @@ class HoodieCDCRDD(
         case AS_IS =>
           val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
           cdcSupplementalLoggingMode match {
-            case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER =>
+            case `data_before_after` =>
               recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0))))
               val before = record.get(2).asInstanceOf[GenericRecord]
               recordToLoad.update(2, recordToJsonAsUTF8String(before))
               val after = record.get(3).asInstanceOf[GenericRecord]
               recordToLoad.update(3, recordToJsonAsUTF8String(after))
-            case HoodieCDCSupplementalLoggingMode.WITH_BEFORE =>
+            case `data_before` =>
               val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
               val op = row.getString(0)
               val recordKey = row.getString(1)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index 7ad7ec10247..fce3f2289e6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
-import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
@@ -33,9 +33,11 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull}
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -133,19 +135,19 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase {
     records.toList
   }
 
-  protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
-                                             cdcSchema: Schema,
-                                             dataSchema: Schema,
-                                             cdcRecords: Seq[HoodieRecord[_]],
-                                             newHoodieRecords: java.util.List[HoodieRecord[_]],
-                                             op: HoodieCDCOperation): Unit = {
+  protected def checkCDCDataForInsertOrUpdate(loggingMode: HoodieCDCSupplementalLoggingMode,
+                                              cdcSchema: Schema,
+                                              dataSchema: Schema,
+                                              cdcRecords: Seq[HoodieRecord[_]],
+                                              newHoodieRecords: java.util.List[HoodieRecord[_]],
+                                              op: HoodieCDCOperation): Unit = {
     val cdcRecord = cdcRecords.head.getData.asInstanceOf[GenericRecord]
     // check schema
     assertEquals(cdcRecord.getSchema, cdcSchema)
-    if (cdcSupplementalLoggingMode == "cdc_op_key") {
+    if (loggingMode == op_key_only) {
       // check record key
       assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
-    } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+    } else if (loggingMode == data_before) {
       // check record key
       assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
       // check before
@@ -181,17 +183,17 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase {
     }
   }
 
-  protected def checkCDCDataForDelete(cdcSupplementalLoggingMode: String,
-                                     cdcSchema: Schema,
-                                     cdcRecords: Seq[IndexedRecord],
-                                     deletedKeys: java.util.List[HoodieKey]): Unit = {
+  protected def checkCDCDataForDelete(loggingMode: HoodieCDCSupplementalLoggingMode,
+                                      cdcSchema: Schema,
+                                      cdcRecords: Seq[IndexedRecord],
+                                      deletedKeys: java.util.List[HoodieKey]): Unit = {
     val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
     // check schema
     assertEquals(cdcRecord.getSchema, cdcSchema)
-    if (cdcSupplementalLoggingMode == "cdc_op_key") {
+    if (loggingMode == op_key_only) {
       // check record key
       assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
-    } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+    } else if (loggingMode == data_before) {
       // check record key
       assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
     } else {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index bc904853806..14b4f50700a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -20,16 +20,16 @@ package org.apache.hudi.functional.cdc
 
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.DataSourceWriteOptions
-import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
-
 import org.apache.spark.sql.SaveMode
-
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
 import scala.collection.JavaConversions._
 
@@ -44,10 +44,10 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
    * Step6: Bluk_Insert 20
    */
   @ParameterizedTest
-  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
-  def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testCOWDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
     val options = commonOpts ++ Map(
-      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name()
     )
 
     var totalInsertedCnt = 0L
@@ -70,8 +70,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
 
     val schemaResolver = new TableSchemaResolver(metaClient)
     val dataSchema = schemaResolver.getTableAvroSchema(false)
-    val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
-      HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+    val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
 
     totalInsertedCnt += 100
     val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -98,7 +97,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     // check the num of cdc data
     assertEquals(cdcDataFromCDCLogFile2.size, 50)
     // check record key, before, after according to the supplemental logging mode
-    checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema,
+    checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema,
       cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE)
 
     val commitTime2 = instant2.getTimestamp
@@ -225,11 +224,11 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
    * Step7: Upsert 30 With CLean
    */
   @ParameterizedTest
-  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
-  def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testMORDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
     val options = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
-      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name()
     )
 
     var totalInsertedCnt = 0L
@@ -252,8 +251,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
 
     val schemaResolver = new TableSchemaResolver(metaClient)
     val dataSchema = schemaResolver.getTableAvroSchema(false)
-    val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
-      HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+    val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
 
     totalInsertedCnt += 100
     val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -429,14 +427,14 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
    */
   @ParameterizedTest
   @CsvSource(Array(
-    "COPY_ON_WRITE,cdc_data_before_after", "MERGE_ON_READ,cdc_data_before_after",
-    "COPY_ON_WRITE,cdc_data_before", "MERGE_ON_READ,cdc_data_before",
-    "COPY_ON_WRITE,cdc_op_key", "MERGE_ON_READ,cdc_op_key"))
-  def testDataSourceWriteWithPartitionField(tableType: String, cdcSupplementalLoggingMode: String): Unit = {
+    "COPY_ON_WRITE,data_before_after", "MERGE_ON_READ,data_before_after",
+    "COPY_ON_WRITE,data_before", "MERGE_ON_READ,data_before",
+    "COPY_ON_WRITE,op_key_only", "MERGE_ON_READ,op_key_only"))
+  def testDataSourceWriteWithPartitionField(tableType: String, loggingMode: String): Unit = {
     val options = commonOpts ++ Map(
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
-      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode
     )
 
     var totalInsertedCnt = 0L
@@ -545,9 +543,9 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
-  def testCDCWithMultiBlocksAndLogFiles(cdcSupplementalLoggingMode: String): Unit = {
-    val (blockSize, logFileSize) = if (cdcSupplementalLoggingMode == "cdc_op_key") {
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testCDCWithMultiBlocksAndLogFiles(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
+    val (blockSize, logFileSize) = if (loggingMode == op_key_only) {
       // only op and key will be stored in cdc log file, we set the smaller values for the two configs.
       // so that it can also write out more than one cdc log file
       // and each of cdc log file has more that one data block as we expect.
@@ -556,7 +554,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
       (2048, 5120)
     }
     val options = commonOpts ++ Map(
-      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode,
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name(),
       "hoodie.logfile.data.block.max.size" -> blockSize.toString,
       "hoodie.logfile.max.size" -> logFileSize.toString
     )
@@ -576,8 +574,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
 
     val schemaResolver = new TableSchemaResolver(metaClient)
     val dataSchema = schemaResolver.getTableAvroSchema(false)
-    val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
-      HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+    val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
 
     // Upsert Operation
     val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
@@ -595,7 +592,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
     // check the num of cdc data
     assertEquals(cdcDataFromCDCLogFile2.size, 50)
     // check record key, before, after according to the supplemental logging mode
-    checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema,
+    checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema,
       cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE)
 
     val commitTime2 = instant2.getTimestamp
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
index 873d55d64cd..28a993e0510 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
@@ -17,19 +17,17 @@
 
 package org.apache.hudi.functional.cdc
 
-import org.apache.hudi.DataSourceReadOptions
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.config.HoodieWriteConfig
-
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.spark.sql.QueryTest.checkAnswer
-import org.apache.spark.sql.{Column, Dataset, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.{Add, If, Literal}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
-
+import org.apache.spark.sql.{Column, Dataset, Row, SaveMode}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.EnumSource
 
 class TestCDCStreamingSuite extends HoodieCDCTestBase {
 
@@ -45,8 +43,8 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase {
    *     and write to country_to_population_tbl.
    */
   @ParameterizedTest
-  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
-  def cdcStreaming(cdcSupplementalLoggingMode: String): Unit = {
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def cdcStreaming(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
     val commonOptions = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -69,7 +67,7 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase {
     userToCountryDF.write.format("hudi")
       .options(commonOptions)
       .option(HoodieTableConfig.CDC_ENABLED.key, "true")
-      .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, cdcSupplementalLoggingMode)
+      .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, loggingMode.name())
       .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
       .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index 60aa3c3e077..bec2230e5ab 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.common.table.HoodieTableMetaClient
-
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions._
-
 import org.junit.jupiter.api.Assertions.assertEquals
 
 class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
@@ -54,7 +53,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
     spark.sql(s"use $databaseName")
 
     Seq("cow", "mor").foreach { tableType =>
-      Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode =>
+      Seq(op_key_only, data_before).foreach { loggingMode =>
         withTempDir { tmp =>
           val tableName = generateTableName
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -70,7 +69,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                |   'primaryKey' = 'id',
                |   'preCombineField' = 'ts',
                |   'hoodie.table.cdc.enabled' = 'true',
-               |   'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode',
+               |   'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
                |   type = '$tableType'
                | )
                | location '$basePath'
@@ -174,7 +173,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
     spark.sql(s"use $databaseName")
 
     Seq("cow", "mor").foreach { tableType =>
-      Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode =>
+      Seq(op_key_only, data_before).foreach { loggingMode =>
         withTempDir { tmp =>
           val tableName = generateTableName
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -192,7 +191,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                |   'primaryKey' = 'id',
                |   'preCombineField' = 'ts',
                |   'hoodie.table.cdc.enabled' = 'true',
-               |   'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode',
+               |   'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
                |   'type' = '$tableType'
                | )
                | location '$basePath'