You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/01/20 01:47:47 UTC
[hudi] branch master updated: [HUDI-5559] Support CDC for flink bounded source (#7677)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0e9bb024fb9 [HUDI-5559] Support CDC for flink bounded source (#7677)
0e9bb024fb9 is described below
commit 0e9bb024fb990c8cd33b3ba84f3b759eedaecc63
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jan 20 09:47:41 2023 +0800
[HUDI-5559] Support CDC for flink bounded source (#7677)
Propagates the change logs through bounded source in streaming execution mode.
---
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 4 +-
.../apache/hudi/configuration/OptionsResolver.java | 12 +-
.../apache/hudi/source/IncrementalInputSplits.java | 166 ++++++++++++---------
.../org/apache/hudi/table/HoodieTableFactory.java | 3 +-
.../org/apache/hudi/table/HoodieTableSource.java | 38 +++--
.../apache/hudi/table/ITTestHoodieDataSource.java | 38 +++++
.../apache/hudi/table/format/TestInputFormat.java | 96 +++++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 13 ++
.../utils/factory/CollectSinkTableFactory.java | 11 +-
9 files changed, 274 insertions(+), 107 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index daf475623d1..994b7ea477e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -103,7 +103,7 @@ public class HoodieCDCExtractor {
}
private void init() {
- initInstantAndCommitMetadatas();
+ initInstantAndCommitMetadata();
}
/**
@@ -209,7 +209,7 @@ public class HoodieCDCExtractor {
*
* And, we need to recognize which is a 'replacecommit', that help to find the list of file group replaced.
*/
- private void initInstantAndCommitMetadatas() {
+ private void initInstantAndCommitMetadata() {
try {
Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 619674145ac..af3e25ef2c0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -132,8 +132,9 @@ public class OptionsResolver {
* @return true if the source is read as streaming with changelog mode enabled
*/
public static boolean emitChangelog(Configuration conf) {
- return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
- && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+ return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
+ || conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && conf.getBoolean(FlinkOptions.CDC_ENABLED)
+ || isIncrementalQuery(conf) && conf.getBoolean(FlinkOptions.CDC_ENABLED);
}
/**
@@ -219,6 +220,13 @@ public class OptionsResolver {
return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
}
+ /**
+ * Returns whether the query is incremental.
+ */
+ public static boolean isIncrementalQuery(Configuration conf) {
+ return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent();
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index a5bab0e5759..c3e7fe29d43 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -128,11 +129,14 @@ public class IncrementalInputSplits implements Serializable {
*
* @param metaClient The meta client
* @param hadoopConf The hadoop configuration
+ * @param cdcEnabled Whether cdc is enabled
+ *
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
- org.apache.hadoop.conf.Configuration hadoopConf) {
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ boolean cdcEnabled) {
HoodieTimeline commitTimeline = getReadTimeline(metaClient);
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
@@ -144,9 +148,35 @@ public class IncrementalInputSplits implements Serializable {
final boolean startFromEarliest = FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
final boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
final boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
+ // We better add another premise: whether the endCommit is cleaned.
boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
- // Step1: find out the files to read, tries to read the files from the commit metadata first,
+ List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, null);
+
+ // Step1: generates the instant range
+ // if the specified end commit is archived, still uses the specified timestamp,
+ // else uses the latest filtered instant time
+ // (would be the latest instant time if the specified end commit is greater than the latest instant time)
+ final String rangeEnd = endOutOfRange || instants.isEmpty() ? endCommit : instants.get(instants.size() - 1).getTimestamp();
+ // keep the same semantics with streaming read, default start from the latest commit
+ final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
+ final InstantRange instantRange;
+ if (!fullTableScan) {
+ instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(cdcEnabled).build();
+ } else if (startFromEarliest && endCommit == null) {
+ // short-cut for snapshot read
+ instantRange = null;
+ } else {
+ instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
+ }
+ // Step2: decides the read end commit
+ final String endInstant = endOutOfRange || endCommit == null
+ ? commitTimeline.lastInstant().get().getTimestamp()
+ : rangeEnd;
+
+ // Step3: find out the files to read, tries to read the files from the commit metadata first,
// fallback to full table scan if any of the following conditions matches:
// 1. there are files in metadata be deleted;
// 2. read from earliest
@@ -154,7 +184,6 @@ public class IncrementalInputSplits implements Serializable {
// 4. the end commit is archived
Set<String> readPartitions;
final FileStatus[] fileStatuses;
- List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, null);
if (fullTableScan) {
// scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
@@ -169,6 +198,12 @@ public class IncrementalInputSplits implements Serializable {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}
+ if (cdcEnabled) {
+ // case1: cdc change log enabled
+ List<MergeOnReadInputSplit> inputSplits = getCdcInputSplits(metaClient, instantRange);
+ return Result.instance(inputSplits, endInstant);
+ }
+ // case2: normal incremental read
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> metadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
@@ -182,7 +217,6 @@ public class IncrementalInputSplits implements Serializable {
if (Arrays.stream(files).anyMatch(fileStatus -> !StreamerUtil.fileExists(fs, fileStatus.getPath()))) {
LOG.warn("Found deleted files in metadata, fall back to full table scan.");
// fallback to full table scan
- fullTableScan = true;
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
@@ -201,30 +235,6 @@ public class IncrementalInputSplits implements Serializable {
return Result.EMPTY;
}
- // Step2: generates the instant range
- // if the specified end commit is archived, still uses the specified timestamp,
- // else uses the latest filtered instant time
- // (would be the latest instant time if the specified end commit is greater than the latest instant time)
- final String rangeEnd = endOutOfRange ? endCommit : instants.get(instants.size() - 1).getTimestamp();
- // keep the same semantics with streaming read, default start from the latest commit
- final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
- final InstantRange instantRange;
- if (!fullTableScan) {
- instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- } else if (startFromEarliest && endCommit == null) {
- // short-cut for snapshot read
- instantRange = null;
- } else {
- instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
- }
-
- // Step3: decides the read end commit
- final String endInstant = fullTableScan
- ? commitTimeline.lastInstant().get().getTimestamp()
- : instants.get(instants.size() - 1).getTimestamp();
-
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange, false);
@@ -237,6 +247,8 @@ public class IncrementalInputSplits implements Serializable {
* @param metaClient The meta client
* @param hadoopConf The hadoop configuration
* @param issuedInstant The last issued instant, only valid in streaming read
+ * @param cdcEnabled Whether cdc is enabled
+ *
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplits(
@@ -290,58 +302,44 @@ public class IncrementalInputSplits implements Serializable {
// streaming read
if (cdcEnabled) {
// case1: cdc change log enabled
- HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
final String endInstant = instantToIssue.getTimestamp();
- Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
-
- if (fileSplits.isEmpty()) {
- LOG.warn("No change logs found for reading in path: " + path);
- return Result.EMPTY;
- }
-
- final AtomicInteger cnt = new AtomicInteger(0);
- List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
- .map(splits ->
- new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
- splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
- .collect(Collectors.toList());
+ List<MergeOnReadInputSplit> inputSplits = getCdcInputSplits(metaClient, instantRange);
return Result.instance(inputSplits, endInstant);
- } else {
- // case2: normal streaming read
- String tableName = conf.getString(FlinkOptions.TABLE_NAME);
- List<HoodieCommitMetadata> activeMetadataList = instants.stream()
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
- if (archivedMetadataList.size() > 0) {
- LOG.warn("\n"
- + "--------------------------------------------------------------------------------\n"
- + "---------- caution: the reader has fall behind too much from the writer,\n"
- + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
- + "--------------------------------------------------------------------------------");
- }
- List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
- // IMPORTANT: the merged metadata list must be in ascending order by instant time
- ? mergeList(archivedMetadataList, activeMetadataList)
- : activeMetadataList;
+ }
+ // case2: normal streaming read
+ String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+ List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+ List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+ if (archivedMetadataList.size() > 0) {
+ LOG.warn("\n"
+ + "--------------------------------------------------------------------------------\n"
+ + "---------- caution: the reader has fall behind too much from the writer,\n"
+ + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ + "--------------------------------------------------------------------------------");
+ }
+ List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
+ // IMPORTANT: the merged metadata list must be in ascending order by instant time
+ ? mergeList(archivedMetadataList, activeMetadataList)
+ : activeMetadataList;
- readPartitions = getReadPartitions(metadataList);
- if (readPartitions.size() == 0) {
- LOG.warn("No partitions found for reading under path: " + path);
- return Result.EMPTY;
- }
- fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
+ readPartitions = getReadPartitions(metadataList);
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading under path: " + path);
+ return Result.EMPTY;
+ }
+ fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
- if (fileStatuses.length == 0) {
- LOG.warn("No files found for reading under path: " + path);
- return Result.EMPTY;
- }
+ if (fileStatuses.length == 0) {
+ LOG.warn("No files found for reading under path: " + path);
+ return Result.EMPTY;
+ }
- final String endInstant = instantToIssue.getTimestamp();
- List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
+ final String endInstant = instantToIssue.getTimestamp();
+ List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
+ fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
- return Result.instance(inputSplits, endInstant);
- }
+ return Result.instance(inputSplits, endInstant);
}
}
@@ -383,6 +381,7 @@ public class IncrementalInputSplits implements Serializable {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
+ .filter(logPath -> !logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
@@ -393,6 +392,25 @@ public class IncrementalInputSplits implements Serializable {
.collect(Collectors.toList());
}
+ private List<MergeOnReadInputSplit> getCdcInputSplits(
+ HoodieTableMetaClient metaClient,
+ InstantRange instantRange) {
+ HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
+ Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+
+ if (fileSplits.isEmpty()) {
+ LOG.warn("No change logs found for reading in path: " + path);
+ return Collections.emptyList();
+ }
+
+ final AtomicInteger cnt = new AtomicInteger(0);
+ return fileSplits.entrySet().stream()
+ .map(splits ->
+ new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
+ splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+ .collect(Collectors.toList());
+ }
+
private static Stream<FileSlice> getFileSlices(
HoodieTableFileSystemView fsView,
String relPartitionPath,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1cf66ea3437..c7a79561b3f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -316,8 +316,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
* Sets up the read options from the table definition.
*/
private static void setupReadOptions(Configuration conf) {
- if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
- && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+ if (OptionsResolver.isIncrementalQuery(conf)) {
conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 90369889db8..fdbf425ed66 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -215,6 +215,7 @@ public class HoodieTableSource implements
@Override
public ChangelogMode getChangelogMode() {
// when read as streaming and changelog mode is enabled, emit as FULL mode;
+ // when read as incremental and cdc is enabled, emit as FULL mode;
// when all the changes are compacted or read as batch, emit as INSERT mode.
return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
}
@@ -380,14 +381,18 @@ public class HoodieTableSource implements
.rowType(this.tableRowType)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(getRequiredPartitionPaths()).build();
- final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf);
+ final boolean cdcEnabled = this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
+ final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf, cdcEnabled);
if (result.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for incremental read, returns empty collection instead");
return InputFormats.EMPTY_INPUT_FORMAT;
+ } else if (cdcEnabled) {
+ return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits());
+ } else {
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ rowDataType, result.getInputSplits(), false);
}
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
- rowDataType, result.getInputSplits(), false);
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
@@ -403,19 +408,22 @@ public class HoodieTableSource implements
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
- if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
- final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
- boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
- if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
- return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
- } else {
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
- rowDataType, Collections.emptyList(), emitDelete);
- }
+ switch (queryType) {
+ case FlinkOptions.QUERY_TYPE_SNAPSHOT:
+ case FlinkOptions.QUERY_TYPE_INCREMENTAL:
+ final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
+ boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+ if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
+ return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
+ } else {
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ rowDataType, Collections.emptyList(), emitDelete);
+ }
+ default:
+ String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType,
+ FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ throw new HoodieException(errMsg);
}
- String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType,
- FlinkOptions.QUERY_TYPE_SNAPSHOT);
- throw new HoodieException(errMsg);
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f831910de9b..10f2bcd095a 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1258,6 +1258,44 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, TestData.dataSetInsert(5, 6));
}
+ @Test
+ void testReadChangelogIncremental() throws Exception {
+ TableEnvironment tableEnv = streamTableEnv;
+ Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
+ conf.setBoolean(FlinkOptions.CDC_ENABLED, true);
+
+ // write 3 batches of the same data set
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+ String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+ .option(FlinkOptions.CDC_ENABLED, true)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
+
+ // write another 10 batches of dataset
+ for (int i = 0; i < 10; i++) {
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ }
+
+ String firstCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+ final String query = String.format("select count(*) from t1/*+ options('read.start-commit'='%s')*/", firstCommit);
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(query).execute().collect());
+ assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), "[-U[1], +U[2]]");
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index affb92e8846..6d0bf731ccc 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -738,7 +738,93 @@ public class TestInputFormat {
conf.setString(FlinkOptions.READ_END_COMMIT, "002");
this.tableSource = getTableSource(conf);
InputFormat<RowData, ?> inputFormat6 = this.tableSource.getInputFormat();
- assertThat(inputFormat6, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> actual6 = readData(inputFormat6);
+ TestData.assertRowDataEquals(actual6, Collections.emptyList());
+ }
+
+ @Test
+ void testReadChangelogIncrementally() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+ options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); // for batch update
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ // write 3 commits first
+ // write the same dataset 3 times to generate changelog
+ for (int i = 0; i < 3; i++) {
+ List<RowData> dataset = TestData.dataSetInsert(1, 2);
+ TestData.writeDataAsBatch(dataset, conf);
+ }
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf));
+ List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ assertThat(commits.size(), is(3));
+
+ // only the start commit
+ conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
+ assertThat(inputFormat1, instanceOf(CdcInputFormat.class));
+
+ List<RowData> actual1 = readData(inputFormat1);
+ final List<RowData> expected1 = TestData.dataSetUpsert(2, 1, 2, 1);
+ TestData.assertRowDataEquals(actual1, expected1);
+
+ // only the start commit: earliest
+ conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat2 = this.tableSource.getInputFormat();
+ assertThat(inputFormat2, instanceOf(CdcInputFormat.class));
+
+ List<RowData> actual2 = readData(inputFormat2);
+ final List<RowData> expected2 = TestData.dataSetInsert(1, 2);
+ TestData.assertRowDataEquals(actual2, expected2);
+
+ // start and end commit: [start commit, end commit]
+ conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0));
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat3 = this.tableSource.getInputFormat();
+ assertThat(inputFormat3, instanceOf(CdcInputFormat.class));
+
+ List<RowData> actual3 = readData(inputFormat3);
+ final List<RowData> expected3 = new ArrayList<>(TestData.dataSetInsert(1));
+ expected3.addAll(TestData.dataSetUpsert(1));
+ expected3.addAll(TestData.dataSetInsert(2));
+ expected3.addAll(TestData.dataSetUpsert(2));
+ TestData.assertRowDataEquals(actual3, expected3);
+
+ // only the end commit: point in time query
+ conf.removeConfig(FlinkOptions.READ_START_COMMIT);
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
+ assertThat(inputFormat4, instanceOf(CdcInputFormat.class));
+
+ List<RowData> actual4 = readData(inputFormat4);
+ final List<RowData> expected4 = TestData.dataSetUpsert(2, 1);
+ TestData.assertRowDataEquals(actual4, expected4);
+
+ // start and end commit: start commit out of range
+ conf.setString(FlinkOptions.READ_START_COMMIT, "000");
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat5 = this.tableSource.getInputFormat();
+ assertThat(inputFormat5, instanceOf(CdcInputFormat.class));
+
+ List<RowData> actual5 = readData(inputFormat5);
+ final List<RowData> expected5 = TestData.dataSetInsert(1, 2);
+ TestData.assertRowDataEquals(actual5, expected5);
+
+ // start and end commit: both are out of range
+ conf.setString(FlinkOptions.READ_START_COMMIT, "001");
+ conf.setString(FlinkOptions.READ_END_COMMIT, "002");
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat6 = this.tableSource.getInputFormat();
List<RowData> actual6 = readData(inputFormat6);
TestData.assertRowDataEquals(actual6, Collections.emptyList());
@@ -836,16 +922,16 @@ public class TestInputFormat {
final List<RowData> expected3 = TestData.dataSetInsert(3, 4);
TestData.assertRowDataEquals(actual3, expected3);
- // start and end commit: start is archived and cleaned, end is active
+ // start and end commit: start is archived and cleaned, end is active and cleaned
conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(0));
this.tableSource = getTableSource(conf);
InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
- assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+ // assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
List<RowData> actual4 = readData(inputFormat4);
- final List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
- TestData.assertRowDataEquals(actual4, expected4);
+ // final List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ TestData.assertRowDataEquals(actual4, Collections.emptyList());
}
@ParameterizedTest
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7d798fff9f1..f05e97cad00 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -366,6 +366,19 @@ public class TestData {
return inserts;
}
+ public static List<RowData> dataSetUpsert(int... ids) {
+ List<RowData> inserts = new ArrayList<>();
+ Arrays.stream(ids).forEach(i -> {
+ inserts.add(
+ updateBeforeRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1")));
+ inserts.add(
+ updateAfterRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1")));
+ });
+ return inserts;
+ }
+
public static List<RowData> filterOddRows(List<RowData> rows) {
return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index 33e9d376588..da0761a7542 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -18,6 +18,8 @@
package org.apache.hudi.utils.factory;
+import org.apache.hudi.util.ChangelogModes;
+
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -37,7 +39,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
import java.util.ArrayList;
import java.util.Collections;
@@ -50,7 +51,7 @@ import java.util.Set;
* Factory for CollectTableSink.
*
* <p>Note: The CollectTableSink collects all the data of a table into a global collection {@code RESULT},
- * so the tests should executed in single thread and the table name should be the same.
+ * so the tests should execute in single thread and the table name should be the same.
*/
public class CollectSinkTableFactory implements DynamicTableSinkFactory {
public static final String FACTORY_ID = "collect";
@@ -104,11 +105,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.DELETE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .build();
+ return ChangelogModes.FULL;
}
@Override