You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/02 01:08:12 UTC

[hudi] branch master updated: [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f171496de2 [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)
f171496de2 is described below

commit f171496de244992958fd3fd22fbcd2a7dc62c7a2
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Oct 2 09:08:03 2022 +0800

    [HUDI-4769] Option read.streaming.skip_compaction skips delta commit (#6848)
---
 .../table/view/AbstractTableFileSystemView.java    |  50 ++++++
 .../apache/hudi/source/IncrementalInputSplits.java | 178 +++++++++------------
 .../hudi/source/StreamReadMonitoringFunction.java  |   5 +-
 .../apache/hudi/table/format/TestInputFormat.java  |  70 +++++++-
 4 files changed, 197 insertions(+), 106 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index ed4bfd7601..976217ae07 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -713,6 +713,33 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     }
   }
 
+  /**
+   * Stream all "merged" file-slices before on an instant time
+   * for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first).
+   *
+   * <p>In streaming read scenario, in order for better reading efficiency, the user can choose to skip the
+   * base files that are produced by compaction. That is to say, we allow the users to consumer only from
+   * these partitioned log files, these log files keep the record sequence just like the normal message queue.
+   *
+   * <p>NOTE: only local view is supported.
+   *
+   * @param partitionStr   Partition Path
+   * @param maxInstantTime Max Instant Time
+   */
+  public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) {
+    try {
+      readLock.lock();
+      String partition = formatPartitionKey(partitionStr);
+      ensurePartitionLoadedCorrectly(partition);
+      return fetchAllStoredFileGroups(partition)
+          .filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime))
+          .map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime))
+          .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
     try {
@@ -1076,6 +1103,29 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     return fileSlice;
   }
 
+  /**
+   * Returns the file slice with all the file slice log files merged.
+   *
+   * @param fileGroup File Group for which the file slice belongs to
+   * @param maxInstantTime The max instant time
+   */
+  private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) {
+    List<FileSlice> fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
+    if (fileSlices.size() == 0) {
+      return Option.empty();
+    }
+    if (fileSlices.size() == 1) {
+      return Option.of(fileSlices.get(0));
+    }
+    final FileSlice latestSlice = fileSlices.get(0);
+    FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(),
+        latestSlice.getFileId());
+
+    // add log files from the latest slice to the earliest
+    fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile));
+    return Option.of(merged);
+  }
+
   /**
    * Default implementation for fetching latest base-file.
    *
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index dc10970b05..09f7054cd7 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -20,6 +20,7 @@ package org.apache.hudi.source;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -220,7 +221,7 @@ public class IncrementalInputSplits implements Serializable {
         : instants.get(instants.size() - 1).getTimestamp();
 
     List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
-        fileStatuses, readPartitions, endInstant, instantRange);
+        fileStatuses, readPartitions, endInstant, instantRange, false);
 
     return Result.instance(inputSplits, endInstant);
   }
@@ -235,8 +236,9 @@ public class IncrementalInputSplits implements Serializable {
    */
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
-      org.apache.hadoop.conf.Configuration hadoopConf,
-      String issuedInstant) {
+      @Nullable org.apache.hadoop.conf.Configuration hadoopConf,
+      String issuedInstant,
+      boolean cdcEnabled) {
     metaClient.reloadActiveTimeline();
     HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
     if (commitTimeline.empty()) {
@@ -248,90 +250,15 @@ public class IncrementalInputSplits implements Serializable {
     final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
     final InstantRange instantRange;
     if (instantToIssue != null) {
-      instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), false);
+      // when cdc is enabled, returns instant range with nullable boundary
+      // to filter the reading instants on the timeline
+      instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), cdcEnabled);
     } else {
       LOG.info("No new instant found for the table under path " + path + ", skip reading");
       return Result.EMPTY;
     }
 
-    String tableName = conf.getString(FlinkOptions.TABLE_NAME);
-
-    Set<String> readPartitions;
-    final FileStatus[] fileStatuses;
-
-    if (instantRange == null) {
-      // reading from the earliest, scans the partitions and files directly.
-      FileIndex fileIndex = getFileIndex();
-      readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
-      if (readPartitions.size() == 0) {
-        LOG.warn("No partitions found for reading in user provided path.");
-        return Result.EMPTY;
-      }
-      fileStatuses = fileIndex.getFilesInPartitions();
-    } else {
-      List<HoodieCommitMetadata> activeMetadataList = instants.stream()
-          .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
-      List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
-      if (archivedMetadataList.size() > 0) {
-        LOG.warn("\n"
-            + "--------------------------------------------------------------------------------\n"
-            + "---------- caution: the reader has fall behind too much from the writer,\n"
-            + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
-            + "--------------------------------------------------------------------------------");
-      }
-      List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
-          // IMPORTANT: the merged metadata list must be in ascending order by instant time
-          ? mergeList(archivedMetadataList, activeMetadataList)
-          : activeMetadataList;
-
-      readPartitions = getReadPartitions(metadataList);
-      if (readPartitions.size() == 0) {
-        LOG.warn("No partitions found for reading in user provided path.");
-        return Result.EMPTY;
-      }
-      fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
-    }
-
-    if (fileStatuses.length == 0) {
-      LOG.warn("No files found for reading in user provided path.");
-      return Result.EMPTY;
-    }
-
-    final String endInstant = instantToIssue.getTimestamp();
-    List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
-        fileStatuses, readPartitions, endInstant, instantRange);
-
-    return Result.instance(inputSplits, endInstant);
-  }
-
-  /**
-   * Returns the incremental cdc input splits.
-   *
-   * @param metaClient    The meta client
-   * @param issuedInstant The last issued instant, only valid in streaming read
-   * @return The list of incremental input splits or empty if there are no new instants
-   */
-  public Result inputSplitsCDC(
-      HoodieTableMetaClient metaClient,
-      String issuedInstant) {
-    metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
-    if (commitTimeline.empty()) {
-      LOG.warn("No splits found for the table under path " + path);
-      return Result.EMPTY;
-    }
-    List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
-    // get the latest instant that satisfies condition
-    final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
-    final InstantRange instantRange;
-    if (instantToIssue != null) {
-      instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), true);
-    } else {
-      LOG.info("No new instant found for the table under path " + path + ", skip reading");
-      return Result.EMPTY;
-    }
-
-    Set<String> readPartitions;
+    final Set<String> readPartitions;
     final FileStatus[] fileStatuses;
 
     if (instantRange == null) {
@@ -339,38 +266,77 @@ public class IncrementalInputSplits implements Serializable {
       FileIndex fileIndex = getFileIndex();
       readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
       if (readPartitions.size() == 0) {
-        LOG.warn("No partitions found for reading in path: " + path);
+        LOG.warn("No partitions found for reading under path: " + path);
         return Result.EMPTY;
       }
       fileStatuses = fileIndex.getFilesInPartitions();
 
       if (fileStatuses.length == 0) {
-        LOG.warn("No files found for reading in path: " + path);
+        LOG.warn("No files found for reading under path: " + path);
         return Result.EMPTY;
       }
 
       final String endInstant = instantToIssue.getTimestamp();
       List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
-          fileStatuses, readPartitions, endInstant, null);
+          fileStatuses, readPartitions, endInstant, null, false);
 
       return Result.instance(inputSplits, endInstant);
     } else {
-      HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
-      final String endInstant = instantToIssue.getTimestamp();
-      Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+      // streaming read
+      if (cdcEnabled) {
+        // case1: cdc change log enabled
+        HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
+        final String endInstant = instantToIssue.getTimestamp();
+        Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
+
+        if (fileSplits.isEmpty()) {
+          LOG.warn("No change logs found for reading in path: " + path);
+          return Result.EMPTY;
+        }
 
-      if (fileSplits.isEmpty()) {
-        LOG.warn("No change logs found for reading in path: " + path);
-        return Result.EMPTY;
-      }
+        final AtomicInteger cnt = new AtomicInteger(0);
+        List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
+            .map(splits ->
+                new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
+                    splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+            .collect(Collectors.toList());
+        return Result.instance(inputSplits, endInstant);
+      } else {
+        // case2: normal streaming read
+        String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+        List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+            .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
+        List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+        if (archivedMetadataList.size() > 0) {
+          LOG.warn("\n"
+              + "--------------------------------------------------------------------------------\n"
+              + "---------- caution: the reader has fall behind too much from the writer,\n"
+              + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+              + "--------------------------------------------------------------------------------");
+        }
+        List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
+            // IMPORTANT: the merged metadata list must be in ascending order by instant time
+            ? mergeList(archivedMetadataList, activeMetadataList)
+            : activeMetadataList;
 
-      final AtomicInteger cnt = new AtomicInteger(0);
-      List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
-          .map(splits ->
-              new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
-                  splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
-          .collect(Collectors.toList());
-      return Result.instance(inputSplits, endInstant);
+        readPartitions = getReadPartitions(metadataList);
+        if (readPartitions.size() == 0) {
+          LOG.warn("No partitions found for reading under path: " + path);
+          return Result.EMPTY;
+        }
+        fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
+
+        if (fileStatuses.length == 0) {
+          LOG.warn("No files found for reading under path: " + path);
+          return Result.EMPTY;
+        }
+
+        final String endInstant = instantToIssue.getTimestamp();
+        List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
+            fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);
+
+        return Result.instance(inputSplits, endInstant);
+      }
     }
   }
 
@@ -401,12 +367,13 @@ public class IncrementalInputSplits implements Serializable {
       FileStatus[] fileStatuses,
       Set<String> readPartitions,
       String endInstant,
-      InstantRange instantRange) {
+      InstantRange instantRange,
+      boolean skipBaseFiles) {
     final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
     final AtomicInteger cnt = new AtomicInteger(0);
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     return readPartitions.stream()
-        .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
+        .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles)
             .map(fileSlice -> {
               Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
                   .sorted(HoodieLogFile.getLogFileComparator())
@@ -421,6 +388,15 @@ public class IncrementalInputSplits implements Serializable {
         .collect(Collectors.toList());
   }
 
+  private static Stream<FileSlice> getFileSlices(
+      HoodieTableFileSystemView fsView,
+      String relPartitionPath,
+      String endInstant,
+      boolean skipBaseFiles) {
+    return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant)
+        : fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
+  }
+
   private FileIndex getFileIndex() {
     FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
     if (this.requiredPartitions != null) {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 1812262ab4..2ad312241e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -201,9 +201,8 @@ public class StreamReadMonitoringFunction
       // table does not exist
       return;
     }
-    IncrementalInputSplits.Result result = cdcEnabled
-        ? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant)
-        : incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
+    IncrementalInputSplits.Result result =
+        incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant, this.cdcEnabled);
     if (result.isEmpty()) {
       // no new instants, returns early
       return;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f066a0b702..458e024446 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /**
  * Test cases for MergeOnReadInputFormat and ParquetInputFormat.
@@ -341,7 +343,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null);
+    IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true);
 
     List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -383,7 +385,7 @@ public class TestInputFormat {
         .build();
 
     // default read the latest commit
-    IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplitsCDC(metaClient, null);
+    IncrementalInputSplits.Result splits = incrementalInputSplits.inputSplits(metaClient, null, null, true);
 
     List<RowData> result = readData(inputFormat, splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
 
@@ -398,6 +400,70 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @Test
+  void testReadSkipCompaction() throws Exception {
+    beforeEach(HoodieTableType.MERGE_ON_READ);
+
+    org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+
+    // write base first with compaction
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", "par3", "par4")))
+        .skipCompaction(true)
+        .build();
+
+    // default read the latest commit
+    // the compaction base files are skipped
+    IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits1.isEmpty());
+    List<RowData> result1 = readData(inputFormat, splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+    String actual1 = TestData.rowDataToString(result1);
+    String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+    assertThat(actual1, is(expected1));
+
+    // write another commit using logs and read again
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    // read from the compaction commit
+    String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false);
+    conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+    IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits2.isEmpty());
+    List<RowData> result2 = readData(inputFormat, splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    String actual2 = TestData.rowDataToString(result2);
+    String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+    assertThat(actual2, is(expected2));
+
+    // write another commit using logs with separate partition
+    // so the file group has only logs
+    TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
+    // refresh the input format
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat(true);
+
+    IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits3.isEmpty());
+    List<RowData> result3 = readData(inputFormat, splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    String actual3 = TestData.rowDataToString(result3);
+    String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+    assertThat(actual3, is(expected3));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {