You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 04:56:41 UTC
[hudi] 08/19: [HUDI-5626] Rename CDC logging mode options (#7760)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit cf52dc23db830aa23f966ca473ef3c4df85039f0
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Sat Jan 28 12:23:30 2023 -0600
[HUDI-5626] Rename CDC logging mode options (#7760)
Change logging mode names for CDC feature to
- op_key_only
- data_before
- data_before_after
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 7 ++-
.../hudi/common/table/HoodieTableConfig.java | 19 ++++----
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 6 ++-
.../hudi/common/table/cdc/HoodieCDCInferCase.java | 4 +-
.../cdc/HoodieCDCSupplementalLoggingMode.java | 37 +++-------------
.../hudi/common/table/cdc/HoodieCDCUtils.java | 14 +++---
.../common/functional/TestHoodieLogFormat.java | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 12 ++---
.../apache/hudi/configuration/OptionsResolver.java | 2 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 6 +--
.../apache/hudi/table/ITTestHoodieDataSource.java | 2 +-
.../apache/hudi/table/format/TestInputFormat.java | 4 +-
.../scala/org/apache/hudi/cdc/CDCRelation.scala | 21 ++++-----
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 36 +++++++--------
.../hudi/functional/cdc/HoodieCDCTestBase.scala | 32 +++++++-------
.../functional/cdc/TestCDCDataFrameSuite.scala | 51 ++++++++++------------
.../functional/cdc/TestCDCStreamingSuite.scala | 16 +++----
.../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 11 +++--
19 files changed, 130 insertions(+), 154 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b9d7c800250..0f7a9c1cf44 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2364,7 +2364,7 @@ public class HoodieWriteConfig extends HoodieConfig {
* CDC supplemental logging mode.
*/
public HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode() {
- return HoodieCDCSupplementalLoggingMode.parse(
+ return HoodieCDCSupplementalLoggingMode.valueOf(
getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index be646df85be..fd2dc60b58b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -58,6 +58,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+
/**
* This class encapsulates all the cdc-writing functions.
*/
@@ -240,10 +243,10 @@ public class HoodieCDCLogger implements Closeable {
// -------------------------------------------------------------------------
private CDCTransformer getTransformer() {
- if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+ if (cdcSupplementalLoggingMode == data_before_after) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
- } else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
+ } else if (cdcSupplementalLoggingMode == data_before) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
} else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index e450614e2b6..e34aa1e6dad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -64,6 +64,9 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
/**
* Configurations on the Hoodie Table like type of ingestion, storage formats, hive table name etc Configurations are loaded from hoodie.properties, these properties are usually set during
@@ -137,15 +140,15 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> CDC_SUPPLEMENTAL_LOGGING_MODE = ConfigProperty
.key("hoodie.table.cdc.supplemental.logging.mode")
- .defaultValue(HoodieCDCSupplementalLoggingMode.OP_KEY.getValue())
+ .defaultValue(op_key_only.name())
.withValidValues(
- HoodieCDCSupplementalLoggingMode.OP_KEY.getValue(),
- HoodieCDCSupplementalLoggingMode.WITH_BEFORE.getValue(),
- HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER.getValue())
+ op_key_only.name(),
+ data_before.name(),
+ data_before_after.name())
.sinceVersion("0.13.0")
- .withDocumentation("When 'cdc_op_key' persist the 'op' and the record key only,"
- + " when 'cdc_data_before' persist the additional 'before' image ,"
- + " and when 'cdc_data_before_after', persist the 'before' and 'after' at the same time.");
+ .withDocumentation("Setting 'op_key_only' persists the 'op' and the record key only, "
+ + "setting 'data_before' persists the additional 'before' image, "
+ + "and setting 'data_before_after' persists the additional 'before' and 'after' images.");
public static final ConfigProperty<String> CREATE_SCHEMA = ConfigProperty
.key("hoodie.table.create.schema")
@@ -659,7 +662,7 @@ public class HoodieTableConfig extends HoodieConfig {
}
public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() {
- return HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
+ return HoodieCDCSupplementalLoggingMode.valueOf(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
}
public String getKeyGeneratorClassName() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 994b7ea477e..506680dc3b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -61,6 +61,8 @@ import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELE
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT;
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE;
import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
@@ -280,7 +282,7 @@ public class HoodieCDCExtractor {
}
} else {
// this is a cdc log
- if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
+ if (supplementalLoggingMode == data_before_after) {
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet());
} else {
try {
@@ -292,7 +294,7 @@ public class HoodieCDCExtractor {
FileSlice beforeFileSlice = null;
FileSlice currentFileSlice = new FileSlice(fileGroupId, instant.getTimestamp(),
new HoodieBaseFile(fs.getFileStatus(new Path(basePath, writeStat.getPath()))), new ArrayList<>());
- if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
+ if (supplementalLoggingMode == op_key_only) {
beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
}
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(),
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
index c6005c60101..dfcb08a84cd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
@@ -24,10 +24,10 @@ package org.apache.hudi.common.table.cdc;
*
* AS_IS:
* For this type, there must be a real cdc log file from which we get the whole/part change data.
- * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after', it keeps all the fields about the
+ * when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}, it keeps all the fields about the
* change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly,
* no more other files need to be loaded.
- * when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', it keeps the `op`, the key and the
+ * when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}, it keeps the `op`, the key and the
* `before` of the changing record. When `op` is equal to 'i' or 'u', need to get the current record from the
* current base/log file as `after`.
* when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just keeps the `op` and the key of
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index 13a51a4f072..b52d1432fc1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -18,8 +18,6 @@
package org.apache.hudi.common.table.cdc;
-import org.apache.hudi.exception.HoodieNotSupportedException;
-
/**
* Change log capture supplemental logging mode. The supplemental log is used for
* accelerating the generation of change log details.
@@ -27,36 +25,13 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
* <p>Three modes are supported:</p>
*
* <ul>
- * <li>OP_KEY: record keys, the reader needs to figure out the update before image and after image;</li>
- * <li>WITH_BEFORE: before images, the reader needs to figure out the update after images;</li>
- * <li>WITH_BEFORE_AFTER: before and after images, the reader can generate the details directly from the log.</li>
+ * <li>op_key_only: record keys, the reader needs to figure out the update before image and after image;</li>
+ * <li>data_before: before images, the reader needs to figure out the update after images;</li>
+ * <li>data_before_after: before and after images, the reader can generate the details directly from the log.</li>
* </ul>
*/
public enum HoodieCDCSupplementalLoggingMode {
- OP_KEY("cdc_op_key"),
- WITH_BEFORE("cdc_data_before"),
- WITH_BEFORE_AFTER("cdc_data_before_after");
-
- private final String value;
-
- HoodieCDCSupplementalLoggingMode(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return this.value;
- }
-
- public static HoodieCDCSupplementalLoggingMode parse(String value) {
- switch (value) {
- case "cdc_op_key":
- return OP_KEY;
- case "cdc_data_before":
- return WITH_BEFORE;
- case "cdc_data_before_after":
- return WITH_BEFORE_AFTER;
- default:
- throw new HoodieNotSupportedException("Unsupported value: " + value);
- }
- }
+ op_key_only,
+ data_before,
+ data_before_after
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index 6ca5869fdfd..069567208b0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -59,7 +59,7 @@ public class HoodieCDCUtils {
};
/**
- * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
+ * The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
*/
public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
@@ -73,11 +73,11 @@ public class HoodieCDCUtils {
public static Schema schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
Schema tableSchema) {
- if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
+ if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.op_key_only) {
return CDC_SCHEMA_OP_AND_RECORDKEY;
- } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
+ } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before) {
return createCDCSchema(tableSchema, false);
- } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
+ } else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.data_before_after) {
return createCDCSchema(tableSchema, true);
} else {
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
@@ -109,7 +109,7 @@ public class HoodieCDCUtils {
}
/**
- * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+ * Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before_after}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime,
GenericRecord before, GenericRecord after) {
@@ -122,7 +122,7 @@ public class HoodieCDCUtils {
}
/**
- * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
+ * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#data_before}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String recordKey, GenericRecord before) {
@@ -134,7 +134,7 @@ public class HoodieCDCUtils {
}
/**
- * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
+ * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is {@link HoodieCDCSupplementalLoggingMode#op_key_only}.
*/
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) {
GenericData.Record record = new GenericData.Record(cdcSchema);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index a8828514eeb..9250429b377 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -551,7 +551,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
+ "]}";
Schema dataSchema = new Schema.Parser().parse(dataSchameString);
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
- HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
+ HoodieCDCSupplementalLoggingMode.data_before_after, dataSchema);
GenericRecord insertedRecord = new GenericData.Record(dataSchema);
insertedRecord.put("uuid", 1);
insertedRecord.put("name", "apple");
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c1812b79e38..63bb0d365a2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -25,8 +25,8 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
@@ -168,12 +169,11 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> SUPPLEMENTAL_LOGGING_MODE = ConfigOptions
.key("cdc.supplemental.logging.mode")
.stringType()
- .defaultValue("cdc_data_before_after") // default record all the change log images
+ .defaultValue(data_before_after.name())
.withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key())
- .withDescription("The supplemental logging mode:"
- + "1) 'cdc_op_key': persist the 'op' and the record key only,"
- + "2) 'cdc_data_before': persist the additional 'before' image,"
- + "3) 'cdc_data_before_after': persist the 'before' and 'after' images at the same time");
+ .withDescription("Setting 'op_key_only' persists the 'op' and the record key only, "
+ + "setting 'data_before' persists the additional 'before' image, "
+ + "and setting 'data_before_after' persists the additional 'before' and 'after' images.");
// ------------------------------------------------------------------------
// Metadata table Options
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index af3e25ef2c0..42b94b58351 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -210,7 +210,7 @@ public class OptionsResolver {
*/
public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Configuration conf) {
String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
- return HoodieCDCSupplementalLoggingMode.parse(mode);
+ return HoodieCDCSupplementalLoggingMode.valueOf(mode);
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 4e162d8e2b8..8474c2a797a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -173,11 +173,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableState.getAvroSchema()));
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
- case WITH_BEFORE_AFTER:
+ case data_before_after:
return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit);
- case WITH_BEFORE:
+ case data_before:
return new BeforeImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
- case OP_KEY:
+ case op_key_only:
return new RecordKeyImageIterator(conf, hadoopConf, tablePath, tableState, cdcSchema, fileSplit, imageManager);
default:
throw new AssertionError("Unexpected mode" + mode);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 10f2bcd095a..d733965b2eb 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -162,7 +162,7 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.CDC_ENABLED, true)
- .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue())
+ .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.name())
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 6d0bf731ccc..7563498bbb6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -324,7 +324,7 @@ public class TestInputFormat {
void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CDC_ENABLED.key(), "true");
- options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+ options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
beforeEach(HoodieTableType.COPY_ON_WRITE, options);
// write the insert data sets
@@ -365,7 +365,7 @@ public class TestInputFormat {
void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.CDC_ENABLED.key(), "true");
- options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+ options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.name());
options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
beforeEach(HoodieTableType.COPY_ON_WRITE, options);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index 4e90303b3b2..5b12a2ab218 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -18,24 +18,21 @@
package org.apache.hudi.cdc
-import org.apache.hudi.AvroConversionUtils
-import org.apache.hudi.DataSourceReadOptions
-import org.apache.hudi.HoodieDataSourceHelper
-import org.apache.hudi.HoodieTableSchema
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
-import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils._
import org.apache.hudi.common.table.log.InstantRange
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
@@ -131,7 +128,7 @@ object CDCRelation {
/**
* CDC Schema For Spark.
- * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before_after`.
+ * Also it's schema when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before_after]].
* Here we use the debezium format.
*/
val FULL_CDC_SPARK_SCHEMA: StructType = {
@@ -146,7 +143,7 @@ object CDCRelation {
}
/**
- * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `op_key`.
+ * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[op_key_only]].
*/
val MIN_CDC_SPARK_SCHEMA: StructType = {
StructType(
@@ -158,7 +155,7 @@ object CDCRelation {
}
/**
- * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is `cdc_data_before`.
+ * CDC Schema For Spark when `hoodie.table.cdc.supplemental.logging.mode` is [[data_before]].
*/
val CDC_WITH_BEFORE_SPARK_SCHEMA: StructType = {
StructType(
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 4768e4a3d87..767300f3659 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -18,40 +18,40 @@
package org.apache.hudi.cdc
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.BaseFileReader
-import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
import org.apache.hudi.HoodieConversionUtils._
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
-import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
+import org.apache.hudi.common.model._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
-import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.avro.Schema
-import org.apache.avro.generic.{GenericData, GenericRecord, GenericRecordBuilder, IndexedRecord}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+import org.apache.hudi.{AvroConversionUtils, AvroProjection, HoodieFileIndex, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, HoodieUnsafeRDD, LogFileIterator, RecordMergingFileIterator, SparkAdapterSupport}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Projection
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
import java.io.Closeable
import java.util.Properties
@@ -244,7 +244,7 @@ class HoodieCDCRDD(
/**
* Keep the after-image data. Only one case will use this:
- * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is 'op_key' or 'cdc_data_before'.
+ * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[op_key_only]] or [[data_before]].
*/
private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty
@@ -306,13 +306,13 @@ class HoodieCDCRDD(
case AS_IS =>
val record = cdcLogRecordIterator.next().asInstanceOf[GenericRecord]
cdcSupplementalLoggingMode match {
- case HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER =>
+ case `data_before_after` =>
recordToLoad.update(0, convertToUTF8String(String.valueOf(record.get(0))))
val before = record.get(2).asInstanceOf[GenericRecord]
recordToLoad.update(2, recordToJsonAsUTF8String(before))
val after = record.get(3).asInstanceOf[GenericRecord]
recordToLoad.update(3, recordToJsonAsUTF8String(after))
- case HoodieCDCSupplementalLoggingMode.WITH_BEFORE =>
+ case `data_before` =>
val row = cdcRecordDeserializer.deserialize(record).get.asInstanceOf[InternalRow]
val op = row.getString(0)
val recordKey = row.getString(1)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index 7ad7ec10247..fce3f2289e6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, HoodieLogFile, HoodieRecord}
-import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieDataBlock
@@ -33,9 +33,11 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull}
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -133,19 +135,19 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase {
records.toList
}
- protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
- cdcSchema: Schema,
- dataSchema: Schema,
- cdcRecords: Seq[HoodieRecord[_]],
- newHoodieRecords: java.util.List[HoodieRecord[_]],
- op: HoodieCDCOperation): Unit = {
+ protected def checkCDCDataForInsertOrUpdate(loggingMode: HoodieCDCSupplementalLoggingMode,
+ cdcSchema: Schema,
+ dataSchema: Schema,
+ cdcRecords: Seq[HoodieRecord[_]],
+ newHoodieRecords: java.util.List[HoodieRecord[_]],
+ op: HoodieCDCOperation): Unit = {
val cdcRecord = cdcRecords.head.getData.asInstanceOf[GenericRecord]
// check schema
assertEquals(cdcRecord.getSchema, cdcSchema)
- if (cdcSupplementalLoggingMode == "cdc_op_key") {
+ if (loggingMode == op_key_only) {
// check record key
assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
- } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+ } else if (loggingMode == data_before) {
// check record key
assert(cdcRecords.map(_.getData.asInstanceOf[GenericRecord].get(1).toString).sorted == newHoodieRecords.map(_.getKey.getRecordKey).sorted)
// check before
@@ -181,17 +183,17 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase {
}
}
- protected def checkCDCDataForDelete(cdcSupplementalLoggingMode: String,
- cdcSchema: Schema,
- cdcRecords: Seq[IndexedRecord],
- deletedKeys: java.util.List[HoodieKey]): Unit = {
+ protected def checkCDCDataForDelete(loggingMode: HoodieCDCSupplementalLoggingMode,
+ cdcSchema: Schema,
+ cdcRecords: Seq[IndexedRecord],
+ deletedKeys: java.util.List[HoodieKey]): Unit = {
val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
// check schema
assertEquals(cdcRecord.getSchema, cdcSchema)
- if (cdcSupplementalLoggingMode == "cdc_op_key") {
+ if (loggingMode == op_key_only) {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
- } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+ } else if (loggingMode == data_before) {
// check record key
assert(cdcRecords.map(_.get(1).toString).sorted == deletedKeys.map(_.getRecordKey).sorted)
} else {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index bc904853806..14b4f50700a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -20,16 +20,16 @@ package org.apache.hudi.functional.cdc
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.DataSourceWriteOptions
-import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
-
import org.apache.spark.sql.SaveMode
-
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
import scala.collection.JavaConversions._
@@ -44,10 +44,10 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
* Step6: Bluk_Insert 20
*/
@ParameterizedTest
- @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
- def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def testCOWDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
val options = commonOpts ++ Map(
- HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name()
)
var totalInsertedCnt = 0L
@@ -70,8 +70,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
val schemaResolver = new TableSchemaResolver(metaClient)
val dataSchema = schemaResolver.getTableAvroSchema(false)
- val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
- HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+ val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
totalInsertedCnt += 100
val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -98,7 +97,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
// check the num of cdc data
assertEquals(cdcDataFromCDCLogFile2.size, 50)
// check record key, before, after according to the supplemental logging mode
- checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema,
+ checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema,
cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE)
val commitTime2 = instant2.getTimestamp
@@ -225,11 +224,11 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
* Step7: Upsert 30 With CLean
*/
@ParameterizedTest
- @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
- def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def testMORDataSourceWrite(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
val options = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
- HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name()
)
var totalInsertedCnt = 0L
@@ -252,8 +251,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
val schemaResolver = new TableSchemaResolver(metaClient)
val dataSchema = schemaResolver.getTableAvroSchema(false)
- val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
- HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+ val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
totalInsertedCnt += 100
val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
@@ -429,14 +427,14 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
*/
@ParameterizedTest
@CsvSource(Array(
- "COPY_ON_WRITE,cdc_data_before_after", "MERGE_ON_READ,cdc_data_before_after",
- "COPY_ON_WRITE,cdc_data_before", "MERGE_ON_READ,cdc_data_before",
- "COPY_ON_WRITE,cdc_op_key", "MERGE_ON_READ,cdc_op_key"))
- def testDataSourceWriteWithPartitionField(tableType: String, cdcSupplementalLoggingMode: String): Unit = {
+ "COPY_ON_WRITE,data_before_after", "MERGE_ON_READ,data_before_after",
+ "COPY_ON_WRITE,data_before", "MERGE_ON_READ,data_before",
+ "COPY_ON_WRITE,op_key_only", "MERGE_ON_READ,op_key_only"))
+ def testDataSourceWriteWithPartitionField(tableType: String, loggingMode: String): Unit = {
val options = commonOpts ++ Map(
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
- HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode
)
var totalInsertedCnt = 0L
@@ -545,9 +543,9 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
}
@ParameterizedTest
- @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
- def testCDCWithMultiBlocksAndLogFiles(cdcSupplementalLoggingMode: String): Unit = {
- val (blockSize, logFileSize) = if (cdcSupplementalLoggingMode == "cdc_op_key") {
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def testCDCWithMultiBlocksAndLogFiles(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
+ val (blockSize, logFileSize) = if (loggingMode == op_key_only) {
// only op and key will be stored in cdc log file, we set the smaller values for the two configs.
// so that it can also write out more than one cdc log file
// and each of cdc log file has more that one data block as we expect.
@@ -556,7 +554,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
(2048, 5120)
}
val options = commonOpts ++ Map(
- HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode,
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> loggingMode.name(),
"hoodie.logfile.data.block.max.size" -> blockSize.toString,
"hoodie.logfile.max.size" -> logFileSize.toString
)
@@ -576,8 +574,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
val schemaResolver = new TableSchemaResolver(metaClient)
val dataSchema = schemaResolver.getTableAvroSchema(false)
- val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
- HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema)
+ val cdcSchema = schemaBySupplementalLoggingMode(loggingMode, dataSchema)
// Upsert Operation
val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
@@ -595,7 +592,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
// check the num of cdc data
assertEquals(cdcDataFromCDCLogFile2.size, 50)
// check record key, before, after according to the supplemental logging mode
- checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema,
+ checkCDCDataForInsertOrUpdate(loggingMode, cdcSchema, dataSchema,
cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE)
val commitTime2 = instant2.getTimestamp
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
index 873d55d64cd..28a993e0510 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCStreamingSuite.scala
@@ -17,19 +17,17 @@
package org.apache.hudi.functional.cdc
-import org.apache.hudi.DataSourceReadOptions
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.config.HoodieWriteConfig
-
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.sql.QueryTest.checkAnswer
-import org.apache.spark.sql.{Column, Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.{Add, If, Literal}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
-
+import org.apache.spark.sql.{Column, Dataset, Row, SaveMode}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.EnumSource
class TestCDCStreamingSuite extends HoodieCDCTestBase {
@@ -45,8 +43,8 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase {
* and write to country_to_population_tbl.
*/
@ParameterizedTest
- @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
- def cdcStreaming(cdcSupplementalLoggingMode: String): Unit = {
+ @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+ def cdcStreaming(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = {
val commonOptions = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -69,7 +67,7 @@ class TestCDCStreamingSuite extends HoodieCDCTestBase {
userToCountryDF.write.format("hudi")
.options(commonOptions)
.option(HoodieTableConfig.CDC_ENABLED.key, "true")
- .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, cdcSupplementalLoggingMode)
+ .option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key, loggingMode.name())
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index 60aa3c3e077..bec2230e5ab 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
-
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
-
import org.junit.jupiter.api.Assertions.assertEquals
class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
@@ -54,7 +53,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"use $databaseName")
Seq("cow", "mor").foreach { tableType =>
- Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode =>
+ Seq(op_key_only, data_before).foreach { loggingMode =>
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -70,7 +69,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| 'primaryKey' = 'id',
| 'preCombineField' = 'ts',
| 'hoodie.table.cdc.enabled' = 'true',
- | 'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode',
+ | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
| type = '$tableType'
| )
| location '$basePath'
@@ -174,7 +173,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"use $databaseName")
Seq("cow", "mor").foreach { tableType =>
- Seq("cdc_op_key", "cdc_data_before").foreach { cdcSupplementalLoggingMode =>
+ Seq(op_key_only, data_before).foreach { loggingMode =>
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -192,7 +191,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| 'primaryKey' = 'id',
| 'preCombineField' = 'ts',
| 'hoodie.table.cdc.enabled' = 'true',
- | 'hoodie.table.cdc.supplemental.logging.mode' = '$cdcSupplementalLoggingMode',
+ | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
| 'type' = '$tableType'
| )
| location '$basePath'