You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/01/20 01:47:47 UTC

[hudi] branch master updated: [HUDI-5559] Support CDC for flink bounded source (#7677)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e9bb024fb9 [HUDI-5559] Support CDC for flink bounded source (#7677)
0e9bb024fb9 is described below

commit 0e9bb024fb990c8cd33b3ba84f3b759eedaecc63
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jan 20 09:47:41 2023 +0800

    [HUDI-5559] Support CDC for flink bounded source (#7677)
    
    Propagates the change logs through bounded source in streaming execution mode.
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  |   4 +-
 .../apache/hudi/configuration/OptionsResolver.java |  12 +-
 .../apache/hudi/source/IncrementalInputSplits.java | 166 ++++++++++++---------
 .../org/apache/hudi/table/HoodieTableFactory.java  |   3 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  38 +++--
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  38 +++++
 .../apache/hudi/table/format/TestInputFormat.java  |  96 +++++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  13 ++
 .../utils/factory/CollectSinkTableFactory.java     |  11 +-
 9 files changed, 274 insertions(+), 107 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index daf475623d1..994b7ea477e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -103,7 +103,7 @@ public class HoodieCDCExtractor {
   }
 
   private void init() {
-    initInstantAndCommitMetadatas();
+    initInstantAndCommitMetadata();
   }
 
   /**
@@ -209,7 +209,7 @@ public class HoodieCDCExtractor {
    *
    *  And, we need to recognize which is a 'replacecommit', that help to find the list of file group replaced.
    */
-  private void initInstantAndCommitMetadatas() {
+  private void initInstantAndCommitMetadata() {
     try {
       Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
       HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 619674145ac..af3e25ef2c0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -132,8 +132,9 @@ public class OptionsResolver {
    * @return true if the source is read as streaming with changelog mode enabled
    */
   public static boolean emitChangelog(Configuration conf) {
-    return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
-        && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+    return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
+        || conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && conf.getBoolean(FlinkOptions.CDC_ENABLED)
+        || isIncrementalQuery(conf) && conf.getBoolean(FlinkOptions.CDC_ENABLED);
   }
 
   /**
@@ -219,6 +220,13 @@ public class OptionsResolver {
     return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
   }
 
+  /**
+   * Returns whether the query is incremental.
+   */
+  public static boolean isIncrementalQuery(Configuration conf) {
+    return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent();
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index a5bab0e5759..c3e7fe29d43 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -128,11 +129,14 @@ public class IncrementalInputSplits implements Serializable {
    *
    * @param metaClient The meta client
    * @param hadoopConf The hadoop configuration
+   * @param cdcEnabled Whether cdc is enabled
+   *
    * @return The list of incremental input splits or empty if there are no new instants
    */
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
-      org.apache.hadoop.conf.Configuration hadoopConf) {
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      boolean cdcEnabled) {
     HoodieTimeline commitTimeline = getReadTimeline(metaClient);
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
@@ -144,9 +148,35 @@ public class IncrementalInputSplits implements Serializable {
     final boolean startFromEarliest = FlinkOptions.START_COMMIT_EARLIEST.equalsIgnoreCase(startCommit);
     final boolean startOutOfRange = startCommit != null && commitTimeline.isBeforeTimelineStarts(startCommit);
     final boolean endOutOfRange = endCommit != null && commitTimeline.isBeforeTimelineStarts(endCommit);
+    // We better add another premise: whether the endCommit is cleaned.
     boolean fullTableScan = startFromEarliest || startOutOfRange || endOutOfRange;
 
-    // Step1: find out the files to read, tries to read the files from the commit metadata first,
+    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, null);
+
+    // Step1: generates the instant range
+    // if the specified end commit is archived, still uses the specified timestamp,
+    // else uses the latest filtered instant time
+    // (would be the latest instant time if the specified end commit is greater than the latest instant time)
+    final String rangeEnd = endOutOfRange || instants.isEmpty() ? endCommit : instants.get(instants.size() - 1).getTimestamp();
+    // keep the same semantics with streaming read, default start from the latest commit
+    final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
+    final InstantRange instantRange;
+    if (!fullTableScan) {
+      instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(cdcEnabled).build();
+    } else if (startFromEarliest && endCommit == null) {
+      // short-cut for snapshot read
+      instantRange = null;
+    } else {
+      instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
+          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
+    }
+    // Step2: decides the read end commit
+    final String endInstant = endOutOfRange || endCommit == null
+        ? commitTimeline.lastInstant().get().getTimestamp()
+        : rangeEnd;
+
+    // Step3: find out the files to read, tries to read the files from the commit metadata first,
     // fallback to full table scan if any of the following conditions matches:
     //   1. there are files in metadata be deleted;
     //   2. read from earliest
@@ -154,7 +184,6 @@ public class IncrementalInputSplits implements Serializable {
     //   4. the end commit is archived
     Set<String> readPartitions;
     final FileStatus[] fileStatuses;
-    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, null);
     if (fullTableScan) {
       // scans the partitions and files directly.
       FileIndex fileIndex = getFileIndex();
@@ -169,6 +198,12 @@ public class IncrementalInputSplits implements Serializable {
         LOG.info("No new instant found for the table under path " + path + ", skip reading");
         return Result.EMPTY;
       }
+      if (cdcEnabled) {
+        // case1: cdc change log enabled
+        List<MergeOnReadInputSplit> inputSplits = getCdcInputSplits(metaClient, instantRange);
+        return Result.instance(inputSplits, endInstant);
+      }
+      // case2: normal incremental read
       String tableName = conf.getString(FlinkOptions.TABLE_NAME);
       List<HoodieCommitMetadata> metadataList = instants.stream()
           .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
@@ -182,7 +217,6 @@ public class IncrementalInputSplits implements Serializable {
       if (Arrays.stream(files).anyMatch(fileStatus -> !StreamerUtil.fileExists(fs, fileStatus.getPath()))) {
         LOG.warn("Found deleted files in metadata, fall back to full table scan.");
         // fallback to full table scan
-        fullTableScan = true;
         // reading from the earliest, scans the partitions and files directly.
         FileIndex fileIndex = getFileIndex();
         readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
@@ -201,30 +235,6 @@ public class IncrementalInputSplits implements Serializable {
       return Result.EMPTY;
     }
 
-    // Step2: generates the instant range
-    // if the specified end commit is archived, still uses the specified timestamp,
-    // else uses the latest filtered instant time
-    // (would be the latest instant time if the specified end commit is greater than the latest instant time)
-    final String rangeEnd = endOutOfRange ? endCommit : instants.get(instants.size() - 1).getTimestamp();
-    // keep the same semantics with streaming read, default start from the latest commit
-    final String rangeStart = startFromEarliest ? null : (startCommit == null ? rangeEnd : startCommit);
-    final InstantRange instantRange;
-    if (!fullTableScan) {
-      instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
-    } else if (startFromEarliest && endCommit == null) {
-      // short-cut for snapshot read
-      instantRange = null;
-    } else {
-      instantRange = InstantRange.builder().startInstant(rangeStart).endInstant(rangeEnd)
-          .rangeType(InstantRange.RangeType.CLOSE_CLOSE).nullableBoundary(true).build();
-    }
-
-    // Step3: decides the read end commit
-    final String endInstant = fullTableScan
-        ? commitTimeline.lastInstant().get().getTimestamp()
-        : instants.get(instants.size() - 1).getTimestamp();
-
     List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
         fileStatuses, readPartitions, endInstant, instantRange, false);
 
@@ -237,6 +247,8 @@ public class IncrementalInputSplits implements Serializable {
    * @param metaClient    The meta client
    * @param hadoopConf    The hadoop configuration
    * @param issuedInstant The last issued instant, only valid in streaming read
+   * @param cdcEnabled    Whether cdc is enabled
+   *
    * @return The list of incremental input splits or empty if there are no new instants
    */
   public Result inputSplits(
@@ -290,58 +302,44 @@ public class IncrementalInputSplits implements Serializable {
       // streaming read
       if (cdcEnabled) {
         // case1: cdc change log enabled
-        HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
         final String endInstant = instantToIssue.getTimestamp();
-        Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
-
-        if (fileSplits.isEmpty()) {
-          LOG.warn("No change logs found for reading in path: " + path);
-          return Result.EMPTY;
-        }
-
-        final AtomicInteger cnt = new AtomicInteger(0);
-        List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
-            .map(splits ->
-                new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
-                    splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
-            .collect(Collectors.toList());
+        List<MergeOnReadInputSplit> inputSplits = getCdcInputSplits(metaClient, instantRange);
         return Result.instance(inputSplits, endInstant);
-      } else {
-        // case2: normal streaming read
-        String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-        List<HoodieCommitMetadata> activeMetadataList = instants.stream()
-            .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
-        List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
-        if (archivedMetadataList.size() > 0) {
-          LOG.warn("\n"
-              + "--------------------------------------------------------------------------------\n"
-              + "---------- caution: the reader has fall behind too much from the writer,\n"
-              + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
-              + "--------------------------------------------------------------------------------");
-        }
-        List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
-            // IMPORTANT: the merged metadata list must be in ascending order by instant time
-            ? mergeList(archivedMetadataList, activeMetadataList)
-            : activeMetadataList;
+      }
+      // case2: normal streaming read
+      String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+      List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+          .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+      List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+      if (archivedMetadataList.size() > 0) {
+        LOG.warn("\n"
+            + "--------------------------------------------------------------------------------\n"
+            + "---------- caution: the reader has fall behind too much from the writer,\n"
+            + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+            + "--------------------------------------------------------------------------------");
+      }
+      List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
+          // IMPORTANT: the merged metadata list must be in ascending order by instant time
+          ? mergeList(archivedMetadataList, activeMetadataList)
+          : activeMetadataList;
 
-        readPartitions = getReadPartitions(metadataList);
-        if (readPartitions.size() == 0) {
-          LOG.warn("No partitions found for reading under path: " + path);
-          return Result.EMPTY;
-        }
-        fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
+      readPartitions = getReadPartitions(metadataList);
+      if (readPartitions.size() == 0) {
+        LOG.warn("No partitions found for reading under path: " + path);
+        return Result.EMPTY;
+      }
+      fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
 
-        if (fileStatuses.length == 0) {
-          LOG.warn("No files found for reading under path: " + path);
-          return Result.EMPTY;
-        }
+      if (fileStatuses.length == 0) {
+        LOG.warn("No files found for reading under path: " + path);
+        return Result.EMPTY;
+      }
 
-        final String endInstant = instantToIssue.getTimestamp();
-        List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
-            fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
+      final String endInstant = instantToIssue.getTimestamp();
+      List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
+          fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
 
-        return Result.instance(inputSplits, endInstant);
-      }
+      return Result.instance(inputSplits, endInstant);
     }
   }
 
@@ -383,6 +381,7 @@ public class IncrementalInputSplits implements Serializable {
               Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
                   .sorted(HoodieLogFile.getLogFileComparator())
                   .map(logFile -> logFile.getPath().toString())
+                  .filter(logPath -> !logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
                   .collect(Collectors.toList()));
               String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
               return new MergeOnReadInputSplit(cnt.getAndAdd(1),
@@ -393,6 +392,25 @@ public class IncrementalInputSplits implements Serializable {
         .collect(Collectors.toList());
   }
 
+  private List<MergeOnReadInputSplit> getCdcInputSplits(
+      HoodieTableMetaClient metaClient,
+      InstantRange instantRange) {
+    HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
+    Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+
+    if (fileSplits.isEmpty()) {
+      LOG.warn("No change logs found for reading in path: " + path);
+      return Collections.emptyList();
+    }
+
+    final AtomicInteger cnt = new AtomicInteger(0);
+    return fileSplits.entrySet().stream()
+        .map(splits ->
+            new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
+                splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+        .collect(Collectors.toList());
+  }
+
   private static Stream<FileSlice> getFileSlices(
       HoodieTableFileSystemView fsView,
       String relPartitionPath,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1cf66ea3437..c7a79561b3f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -316,8 +316,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
    * Sets up the read options from the table definition.
    */
   private static void setupReadOptions(Configuration conf) {
-    if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
-        && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+    if (OptionsResolver.isIncrementalQuery(conf)) {
       conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
     }
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 90369889db8..fdbf425ed66 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -215,6 +215,7 @@ public class HoodieTableSource implements
   @Override
   public ChangelogMode getChangelogMode() {
     // when read as streaming and changelog mode is enabled, emit as FULL mode;
+    // when read as incremental and cdc is enabled, emit as FULL mode;
     // when all the changes are compacted or read as batch, emit as INSERT mode.
     return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
   }
@@ -380,14 +381,18 @@ public class HoodieTableSource implements
             .rowType(this.tableRowType)
             .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
             .requiredPartitions(getRequiredPartitionPaths()).build();
-        final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf);
+        final boolean cdcEnabled = this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
+        final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf, cdcEnabled);
         if (result.isEmpty()) {
           // When there is no input splits, just return an empty source.
           LOG.warn("No input splits generate for incremental read, returns empty collection instead");
           return InputFormats.EMPTY_INPUT_FORMAT;
+        } else if (cdcEnabled) {
+          return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits());
+        } else {
+          return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+              rowDataType, result.getInputSplits(), false);
         }
-        return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
-            rowDataType, result.getInputSplits(), false);
       default:
         String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType,
             FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
@@ -403,19 +408,22 @@ public class HoodieTableSource implements
     final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
 
     final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
-    if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
-      final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
-      boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
-      if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
-        return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
-      } else {
-        return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
-            rowDataType, Collections.emptyList(), emitDelete);
-      }
+    switch (queryType) {
+      case FlinkOptions.QUERY_TYPE_SNAPSHOT:
+      case FlinkOptions.QUERY_TYPE_INCREMENTAL:
+        final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
+        boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+        if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
+          return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
+        } else {
+          return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+              rowDataType, Collections.emptyList(), emitDelete);
+        }
+      default:
+        String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType,
+            FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_INCREMENTAL);
+        throw new HoodieException(errMsg);
     }
-    String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType,
-        FlinkOptions.QUERY_TYPE_SNAPSHOT);
-    throw new HoodieException(errMsg);
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index f831910de9b..10f2bcd095a 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1258,6 +1258,44 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, TestData.dataSetInsert(5, 6));
   }
 
+  @Test
+  void testReadChangelogIncremental() throws Exception {
+    TableEnvironment tableEnv = streamTableEnv;
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch upsert
+    conf.setBoolean(FlinkOptions.CDC_ENABLED, true);
+
+    // write 3 batches of the same data set
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+    String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+        .option(FlinkOptions.CDC_ENABLED, true)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
+
+    // write another 10 batches of dataset
+    for (int i = 0; i < 10; i++) {
+      TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+    }
+
+    String firstCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+    final String query = String.format("select count(*) from t1/*+ options('read.start-commit'='%s')*/", firstCommit);
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery(query).execute().collect());
+    assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), "[-U[1], +U[2]]");
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index affb92e8846..6d0bf731ccc 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -738,7 +738,93 @@ public class TestInputFormat {
     conf.setString(FlinkOptions.READ_END_COMMIT, "002");
     this.tableSource = getTableSource(conf);
     InputFormat<RowData, ?> inputFormat6 = this.tableSource.getInputFormat();
-    assertThat(inputFormat6, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> actual6 = readData(inputFormat6);
+    TestData.assertRowDataEquals(actual6, Collections.emptyList());
+  }
+
+  @Test
+  void testReadChangelogIncrementally() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
+    options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); // for batch update
+    beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+    // write 3 commits first
+    // write the same dataset 3 times to generate changelog
+    for (int i = 0; i < 3; i++) {
+      List<RowData> dataset = TestData.dataSetInsert(1, 2);
+      TestData.writeDataAsBatch(dataset, conf);
+    }
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf));
+    List<String> commits = metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+        .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+    assertThat(commits.size(), is(3));
+
+    // only the start commit
+    conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
+    assertThat(inputFormat1, instanceOf(CdcInputFormat.class));
+
+    List<RowData> actual1 = readData(inputFormat1);
+    final List<RowData> expected1 = TestData.dataSetUpsert(2, 1, 2, 1);
+    TestData.assertRowDataEquals(actual1, expected1);
+
+    // only the start commit: earliest
+    conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat2 = this.tableSource.getInputFormat();
+    assertThat(inputFormat2, instanceOf(CdcInputFormat.class));
+
+    List<RowData> actual2 = readData(inputFormat2);
+    final List<RowData> expected2 = TestData.dataSetInsert(1, 2);
+    TestData.assertRowDataEquals(actual2, expected2);
+
+    // start and end commit: [start commit, end commit]
+    conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0));
+    conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat3 = this.tableSource.getInputFormat();
+    assertThat(inputFormat3, instanceOf(CdcInputFormat.class));
+
+    List<RowData> actual3 = readData(inputFormat3);
+    final List<RowData> expected3 = new ArrayList<>(TestData.dataSetInsert(1));
+    expected3.addAll(TestData.dataSetUpsert(1));
+    expected3.addAll(TestData.dataSetInsert(2));
+    expected3.addAll(TestData.dataSetUpsert(2));
+    TestData.assertRowDataEquals(actual3, expected3);
+
+    // only the end commit: point in time query
+    conf.removeConfig(FlinkOptions.READ_START_COMMIT);
+    conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
+    assertThat(inputFormat4, instanceOf(CdcInputFormat.class));
+
+    List<RowData> actual4 = readData(inputFormat4);
+    final List<RowData> expected4 = TestData.dataSetUpsert(2, 1);
+    TestData.assertRowDataEquals(actual4, expected4);
+
+    // start and end commit: start commit out of range
+    conf.setString(FlinkOptions.READ_START_COMMIT, "000");
+    conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat5 = this.tableSource.getInputFormat();
+    assertThat(inputFormat5, instanceOf(CdcInputFormat.class));
+
+    List<RowData> actual5 = readData(inputFormat5);
+    final List<RowData> expected5 = TestData.dataSetInsert(1, 2);
+    TestData.assertRowDataEquals(actual5, expected5);
+
+    // start and end commit: both are out of range
+    conf.setString(FlinkOptions.READ_START_COMMIT, "001");
+    conf.setString(FlinkOptions.READ_END_COMMIT, "002");
+    this.tableSource = getTableSource(conf);
+    InputFormat<RowData, ?> inputFormat6 = this.tableSource.getInputFormat();
 
     List<RowData> actual6 = readData(inputFormat6);
     TestData.assertRowDataEquals(actual6, Collections.emptyList());
@@ -836,16 +922,16 @@ public class TestInputFormat {
     final List<RowData> expected3 = TestData.dataSetInsert(3, 4);
     TestData.assertRowDataEquals(actual3, expected3);
 
-    // start and end commit: start is archived and cleaned, end is active
+    // start and end commit: start is archived and cleaned, end is active and cleaned
     conf.setString(FlinkOptions.READ_START_COMMIT, archivedCommits.get(1));
     conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(0));
     this.tableSource = getTableSource(conf);
     InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
-    assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+    // assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
 
     List<RowData> actual4 = readData(inputFormat4);
-    final List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-    TestData.assertRowDataEquals(actual4, expected4);
+    // final List<RowData> expected4 = TestData.dataSetInsert(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+    TestData.assertRowDataEquals(actual4, Collections.emptyList());
   }
 
   @ParameterizedTest
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7d798fff9f1..f05e97cad00 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -366,6 +366,19 @@ public class TestData {
     return inserts;
   }
 
+  public static List<RowData> dataSetUpsert(int... ids) {
+    List<RowData> inserts = new ArrayList<>();
+    Arrays.stream(ids).forEach(i -> {
+      inserts.add(
+          updateBeforeRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23,
+              TimestampData.fromEpochMillis(i), StringData.fromString("par1")));
+      inserts.add(
+          updateAfterRow(StringData.fromString("id" + i), StringData.fromString("Danny"), 23,
+              TimestampData.fromEpochMillis(i), StringData.fromString("par1")));
+    });
+    return inserts;
+  }
+
   public static List<RowData> filterOddRows(List<RowData> rows) {
     return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index 33e9d376588..da0761a7542 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utils.factory;
 
+import org.apache.hudi.util.ChangelogModes;
+
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -37,7 +39,6 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -50,7 +51,7 @@ import java.util.Set;
  * Factory for CollectTableSink.
  *
  * <p>Note: The CollectTableSink collects all the data of a table into a global collection {@code RESULT},
- * so the tests should executed in single thread and the table name should be the same.
+ * so the tests should execute in single thread and the table name should be the same.
  */
 public class CollectSinkTableFactory implements DynamicTableSinkFactory {
   public static final String FACTORY_ID = "collect";
@@ -104,11 +105,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-      return ChangelogMode.newBuilder()
-          .addContainedKind(RowKind.INSERT)
-          .addContainedKind(RowKind.DELETE)
-          .addContainedKind(RowKind.UPDATE_AFTER)
-          .build();
+      return ChangelogModes.FULL;
     }
 
     @Override