You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2021/06/15 15:18:53 UTC

[hudi] branch master updated: [HUDI-1999] Refresh the base file view cache for WriteProfile (#3067)

This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cb642ce  [HUDI-1999] Refresh the base file view cache for WriteProfile (#3067)
cb642ce is described below

commit cb642ceb75ef903a77593af943c6ba19053257ee
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Jun 15 23:18:38 2021 +0800

    [HUDI-1999] Refresh the base file view cache for WriteProfile (#3067)
    
    Refresh the view to discover new small files.
---
 .../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 12 ++-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  4 +-
 .../partitioner/profile/DeltaWriteProfile.java     |  5 +-
 .../sink/partitioner/profile/WriteProfile.java     | 37 ++++++++-
 .../sink/partitioner/profile/WriteProfiles.java    | 94 ++++++++++++++++++++++
 .../hudi/source/StreamReadMonitoringFunction.java  | 58 +------------
 .../hudi/sink/partitioner/TestBucketAssigner.java  | 48 ++++++++++-
 7 files changed, 197 insertions(+), 61 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index c87f3dd..f414450 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -55,6 +55,11 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
 
   private boolean isClosed = false;
 
+  /**
+   * Flag saying whether we should replace the old file with new.
+   */
+  private boolean shouldReplace = true;
+
   public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                     Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
                                     TaskContextSupplier taskContextSupplier, Path basePath) {
@@ -103,11 +108,12 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
   @Override
   protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
     // old and new file name expects to be the same.
-    if (!oldFileName.equals(newFileName)) {
+    if (!FSUtils.getCommitTime(oldFileName).equals(instantTime)) {
       LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
           + "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
           + "this rarely happens when the checkpoint success event was not received yet\n"
           + "but the write task flush with new instant time, which does not break the UPSERT semantics");
+      shouldReplace = false;
     }
     super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
     try {
@@ -146,6 +152,10 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
   }
 
   public void finalizeWrite() {
+    // Behaves like the normal merge handle if the write instant time changes.
+    if (!shouldReplace) {
+      return;
+    }
     // The file visibility should be kept by the configured ConsistencyGuard instance.
     try {
       fs.delete(oldFilePath, false);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index ddbb3dd..4963d31 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -206,11 +206,11 @@ public class StreamWriteOperatorCoordinator
             }
             // start new instant.
             startInstant();
+            // sync Hive if is enabled
+            syncHiveIfEnabled();
           }
         }, "commits the instant %s", this.instant
     );
-    // sync Hive if is enabled
-    syncHiveIfEnabled();
   }
 
   private void syncHiveIfEnabled() {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index 8f8c692..9f56bdd 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -54,12 +54,13 @@ public class DeltaWriteProfile extends WriteProfile {
     // Find out all eligible small file slices
     if (!commitTimeline.empty()) {
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      // initialize the filesystem view based on the commit metadata
+      initFSViewIfNecessary(commitTimeline);
       // find smallest file in partition and append to it
       List<FileSlice> allSmallFileSlices = new ArrayList<>();
       // If we can index log files, we can add more inserts to log files for fileIds including those under
       // pending compaction.
-      List<FileSlice> allFileSlices =
-          table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
+      List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
               .collect(Collectors.toList());
       for (FileSlice fileSlice : allFileSlices) {
         if (isSmallFile(fileSlice)) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index 3aee176..71d0d83 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -25,12 +25,17 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.partitioner.BucketAssigner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +61,11 @@ public class WriteProfile {
   protected final HoodieWriteConfig config;
 
   /**
+   * Table base path.
+   */
+  private final Path basePath;
+
+  /**
    * The hoodie table.
    */
   protected final HoodieTable<?, ?, ?, ?> table;
@@ -81,11 +91,23 @@ public class WriteProfile {
    */
   private long reloadedCheckpointId;
 
+  /**
+   * The file system view cache for one checkpoint interval.
+   */
+  protected HoodieTableFileSystemView fsView;
+
+  /**
+   * Hadoop configuration.
+   */
+  private final Configuration hadoopConf;
+
   public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
     this.config = config;
+    this.basePath = new Path(config.getBasePath());
     this.smallFilesMap = new HashMap<>();
     this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
     this.table = HoodieFlinkTable.create(config, context);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
     // profile the record statistics on construction
     recordProfile();
   }
@@ -160,7 +182,9 @@ public class WriteProfile {
 
     if (!commitTimeline.empty()) { // if we have some commits
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+      // initialize the filesystem view based on the commit metadata
+      initFSViewIfNecessary(commitTimeline);
+      List<HoodieBaseFile> allFiles = fsView
           .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
 
       for (HoodieBaseFile file : allFiles) {
@@ -178,6 +202,16 @@ public class WriteProfile {
     return smallFileLocations;
   }
 
+  protected void initFSViewIfNecessary(HoodieTimeline commitTimeline) {
+    if (fsView == null) {
+      List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants()
+          .map(instant -> WriteProfiles.getCommitMetadata(config.getTableName(), basePath, instant, commitTimeline))
+          .collect(Collectors.toList());
+      FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList);
+      fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
+    }
+  }
+
   private void recordProfile() {
     this.avgSize = averageBytesPerRecord();
     if (config.shouldAllowMultiWriteOnSameInstant()) {
@@ -200,6 +234,7 @@ public class WriteProfile {
       return;
     }
     recordProfile();
+    this.fsView = null;
     this.smallFilesMap.clear();
     this.table.getMetaClient().reloadActiveTimeline();
     this.reloadedCheckpointId = checkpointId;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index cf99ab3..9a8b7d0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -19,15 +19,33 @@
 package org.apache.hudi.sink.partitioner.profile;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Factory for {@link WriteProfile}.
  */
 public class WriteProfiles {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteProfiles.class);
+
   private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
 
   private WriteProfiles() {}
@@ -58,4 +76,80 @@ public class WriteProfiles {
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  /**
+   * Returns the commit file paths with given metadata.
+   *
+   * @param basePath Table base path
+   * @param metadata The metadata
+   * @param fs       The filesystem
+   *
+   * @return the commit file status list
+   */
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
+    return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files
+        .filter(fileStatus -> fileStatus.getLen() > 0)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the commit metadata of the given instant.
+   *
+   * @param tableName The table name
+   * @param basePath  The table base path
+   * @param instant   The hoodie instant
+   * @param timeline  The timeline
+   *
+   * @return the commit metadata
+   */
+  public static HoodieCommitMetadata getCommitMetadata(
+      String tableName,
+      Path basePath,
+      HoodieInstant instant,
+      HoodieTimeline timeline) {
+    byte[] data = timeline.getInstantDetails(instant).get();
+    try {
+      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    } catch (IOException e) {
+      LOG.error("Get write metadata for table {} with instant {} and path: {} error",
+          tableName, instant.getTimestamp(), basePath);
+      throw new HoodieException(e);
+    }
+  }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 3a2de84..983c19f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.source;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -30,7 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -46,11 +45,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -220,10 +217,11 @@ public class StreamReadMonitoringFunction
     // 3. filter the full file paths
     // 4. use the file paths from #step 3 as the back-up of the filesystem view
 
+    String tableName = conf.getString(FlinkOptions.TABLE_NAME);
     List<HoodieCommitMetadata> metadataList = instants.stream()
-        .map(instant -> getCommitMetadata(instant, commitTimeline)).collect(Collectors.toList());
+        .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
     Set<String> writePartitions = getWritePartitionPaths(metadataList);
-    FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList);
+    FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
     if (fileStatuses.length == 0) {
       LOG.warn("No files found for reading in user provided path.");
       return;
@@ -334,52 +332,4 @@ public class StreamReadMonitoringFunction
         .flatMap(Collection::stream)
         .collect(Collectors.toSet());
   }
-
-  /**
-   * Returns all the incremental write file path statuses with the given commits metadata.
-   *
-   * @param metadataList The commits metadata
-   * @return the file statuses array
-   */
-  private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> metadataList) {
-    FileSystem fs = FSUtils.getFs(path.toString(), hadoopConf);
-    return metadataList.stream().map(metadata -> getWritePathsOfInstant(metadata, fs))
-        .flatMap(Collection::stream).toArray(FileStatus[]::new);
-  }
-
-  private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) {
-    return metadata.getFileIdAndFullPaths(path.toString()).values().stream()
-        .map(org.apache.hadoop.fs.Path::new)
-        // filter out the file paths that does not exist, some files may be cleaned by
-        // the cleaner.
-        .filter(path -> {
-          try {
-            return fs.exists(path);
-          } catch (IOException e) {
-            LOG.error("Checking exists of path: {} error", path);
-            throw new HoodieException(e);
-          }
-        }).map(path -> {
-          try {
-            return fs.getFileStatus(path);
-          } catch (IOException e) {
-            LOG.error("Get write status of path: {} error", path);
-            throw new HoodieException(e);
-          }
-        })
-        // filter out crushed files
-        .filter(fileStatus -> fileStatus.getLen() > 0)
-        .collect(Collectors.toList());
-  }
-
-  private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) {
-    byte[] data = timeline.getInstantDetails(instant).get();
-    try {
-      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
-    } catch (IOException e) {
-      LOG.error("Get write metadata for table {} with instant {} and path: {} error",
-          conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), path);
-      throw new HoodieException(e);
-    }
-  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 1c895b6..3efa444 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.partitioner.profile.WriteProfile;
 import org.apache.hudi.table.action.commit.BucketInfo;
@@ -29,6 +31,7 @@ import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.table.action.commit.SmallFile;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +48,10 @@ import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link BucketAssigner}.
@@ -52,6 +59,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class TestBucketAssigner {
   private HoodieWriteConfig writeConfig;
   private HoodieFlinkEngineContext context;
+  private Configuration conf;
 
   @TempDir
   File tempFile;
@@ -59,7 +67,7 @@ public class TestBucketAssigner {
   @BeforeEach
   public void before() throws IOException {
     final String basePath = tempFile.getAbsolutePath();
-    final Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf = TestConfigurations.getDefaultConf(basePath);
 
     writeConfig = StreamerUtil.getHoodieClientConfig(conf);
     context = new HoodieFlinkEngineContext(
@@ -291,6 +299,44 @@ public class TestBucketAssigner {
     assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
   }
 
+  @Test
+  public void testWriteProfileReload() throws Exception {
+    WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+    List<SmallFile> smallFiles1 = writeProfile.getSmallFiles("par1");
+    assertTrue(smallFiles1.isEmpty(), "Should have no small files");
+
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    Option<String> instantOption = getLastCompleteInstant(writeProfile);
+    assertFalse(instantOption.isPresent());
+
+    writeProfile.reload(1);
+    String instant1 = getLastCompleteInstant(writeProfile).orElse(null);
+    assertNotNull(instant1);
+    List<SmallFile> smallFiles2 = writeProfile.getSmallFiles("par1");
+    assertThat("Should have 1 small file", smallFiles2.size(), is(1));
+    assertThat("Small file should have same timestamp as last complete instant",
+        smallFiles2.get(0).location.getInstantTime(), is(instant1));
+
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    List<SmallFile> smallFiles3 = writeProfile.getSmallFiles("par1");
+    assertThat("Should have 1 small file", smallFiles3.size(), is(1));
+    assertThat("Non-reloaded write profile has the same base file view as before",
+        smallFiles3.get(0).location.getInstantTime(), is(instant1));
+
+    writeProfile.reload(2);
+    String instant2 = getLastCompleteInstant(writeProfile).orElse(null);
+    assertNotEquals(instant2, instant1, "Should have new complete instant");
+    List<SmallFile> smallFiles4 = writeProfile.getSmallFiles("par1");
+    assertThat("Should have 1 small file", smallFiles4.size(), is(1));
+    assertThat("Small file should have same timestamp as last complete instant",
+        smallFiles4.get(0).location.getInstantTime(), is(instant2));
+  }
+
+  private static Option<String> getLastCompleteInstant(WriteProfile profile) {
+    return profile.getTable().getMetaClient().getCommitsTimeline()
+        .filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
+  }
+
   private void assertBucketEquals(
       BucketInfo bucketInfo,
       String partition,