You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/21 04:36:55 UTC

[hudi] branch master updated: [HUDI-2583] Refactor TestWriteCopyOnWrite test cases (#3832)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aa3c4ec  [HUDI-2583] Refactor TestWriteCopyOnWrite test cases (#3832)
aa3c4ec is described below

commit aa3c4ecda57c412b5822348d40cdf983c0035fb7
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Oct 21 12:36:41 2021 +0800

    [HUDI-2583] Refactor TestWriteCopyOnWrite test cases (#3832)
---
 .../sink/partitioner/profile/WriteProfile.java     |   8 -
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 964 +++++----------------
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  35 -
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |   6 -
 .../hudi/sink/utils/InsertFunctionWrapper.java     |   7 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   2 +-
 .../hudi/sink/utils/TestFunctionWrapper.java       | 124 +++
 .../org/apache/hudi/sink/utils/TestWriteBase.java  | 425 +++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  11 +-
 9 files changed, 776 insertions(+), 806 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index d3de247..1171a54 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -31,11 +31,9 @@ import org.apache.hudi.sink.partitioner.BucketAssigner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SmallFile;
-import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
-import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,11 +97,6 @@ public class WriteProfile {
   protected AbstractTableFileSystemView fsView;
 
   /**
-   * Hadoop configuration.
-   */
-  private final Configuration hadoopConf;
-
-  /**
    * Metadata cache to reduce IO of metadata files.
    */
   private final Map<String, HoodieCommitMetadata> metadataCache;
@@ -114,7 +107,6 @@ public class WriteProfile {
     this.smallFilesMap = new HashMap<>();
     this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
     this.table = HoodieFlinkTable.create(config, context);
-    this.hadoopConf = StreamerUtil.getHadoopConf();
     this.metadataCache = new HashMap<>();
     // profile the record statistics on construction
     recordProfile();
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 5b25311..d8588f8 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,93 +19,44 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.InsertFunctionWrapper;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.TestWriteBase;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
-import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.table.data.RowData;
-import org.hamcrest.MatcherAssert;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for stream write.
  */
-public class TestWriteCopyOnWrite {
-
-  protected static final Map<String, String> EXPECTED1 = new HashMap<>();
-
-  protected static final Map<String, String> EXPECTED2 = new HashMap<>();
-
-  protected static final Map<String, String> EXPECTED3 = new HashMap<>();
-
-  static {
-    EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
-    EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
-    EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
-    EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
-
-    EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
-    EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
-    EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
-        + "id9,par3,id9,Jane,19,6,par3]");
-    EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
-        + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
-
-    EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
-  }
+public class TestWriteCopyOnWrite extends TestWriteBase {
 
   protected Configuration conf;
 
-  protected StreamWriteFunctionWrapper<RowData> funcWrapper;
-
   @TempDir
   File tempFile;
 
   @BeforeEach
-  public void before() throws Exception {
+  public void before() {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_TYPE, getTableType().name());
     setUp(conf);
-    this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
   }
 
   /**
@@ -115,353 +66,148 @@ public class TestWriteCopyOnWrite {
     // for sub-class extension
   }
 
-  @AfterEach
-  public void after() throws Exception {
-    funcWrapper.close();
-  }
-
   @Test
   public void testCheckpoint() throws Exception {
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    // no checkpoint, so the coordinator does not accept any events
-    assertTrue(
-        funcWrapper.getEventBuffer().length == 1
-            && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    String instant = lastPendingInstant();
-
-    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-    List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
-    assertNotNull(writeStatuses);
-    MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
-    assertThat(writeStatuses.stream()
-            .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
-            .collect(Collectors.joining(",")),
-        is("par1,par2,par3,par4"));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    checkInstantState(REQUESTED, instant);
-    funcWrapper.checkpointComplete(1);
-    // the coordinator checkpoint commits the inflight instant.
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // checkpoint for next round, no data input, so after the checkpoint,
-    // there should not be REQUESTED Instant
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(2);
-
-    String instant2 = lastPendingInstant();
-    assertNotEquals(instant, instant2);
-
-    final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class));
-    List<WriteStatus> writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses();
-    assertNotNull(writeStatuses2);
-    assertThat(writeStatuses2.size(), is(0)); // write empty statuses
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    funcWrapper.checkpointComplete(2);
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+        // no checkpoint, so the coordinator does not accept any events
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertNextEvent(4, "par1,par2,par3,par4")
+        .checkpointComplete(1)
+        // checkpoint for next round, no data input, so after the checkpoint,
+        // there should not be REQUESTED Instant
+        // this triggers the data write and event send
+        .checkpoint(2)
+        .assertEmptyEvent()
+        .emptyCheckpoint(2)
+        .end();
   }
 
   @Test
   public void testCheckpointFails() throws Exception {
     // reset the config option
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // no data written and triggers checkpoint fails,
-    // then we should revert the start instant
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    String instant = lastPendingInstant();
-    assertNotNull(instant);
-
-    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-    List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
-    assertNotNull(writeStatuses);
-    assertThat(writeStatuses.size(), is(0)); // no data write
-
-    // fails the checkpoint
-    funcWrapper.checkpointFails(1);
-    assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(),
-        "The last checkpoint was aborted, ignore the events");
-
-    // the instant metadata should be reused
-    checkInstantState(REQUESTED, instant);
-    checkInstantState(HoodieInstant.State.COMPLETED, null);
-
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    // this returns early because there is no inflight instant
-    assertDoesNotThrow(() -> funcWrapper.checkpointFunction(2),
-        "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
-    // do not send the write event and fails the checkpoint,
-    // behaves like the last checkpoint is successful.
-    funcWrapper.checkpointFails(2);
+    preparePipeline(conf)
+        // no data written and triggers checkpoint fails,
+        // then we should revert the start instant
+        .checkpoint(1)
+        .assertEmptyEvent()
+        .checkpointFails(1)
+        .consume(TestData.DATA_SET_INSERT)
+        .checkpointNotThrow(2,
+            "The stream writer reuse the last instant time when waiting for the last instant commit timeout")
+        // do not send the write event and fails the checkpoint,
+        // behaves like the last checkpoint is successful.
+        .checkpointFails(2)
+        .end();
   }
 
   @Test
   public void testSubtaskFails() throws Exception {
     // open the function and ingest data
-    funcWrapper.openFunction();
-    // no data written and triggers checkpoint fails,
-    // then we should revert the start instant
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    funcWrapper.getNextEvent();
-
-    String instant1 = lastPendingInstant();
-    assertNotNull(instant1);
-
-    // fails the subtask
-    funcWrapper.subTaskFails(0);
-
-    String instant2 = lastPendingInstant();
-    assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
-
-    checkInstantState(HoodieInstant.State.COMPLETED, null);
+    preparePipeline()
+        .checkpoint(1)
+        .assertEmptyEvent()
+        .subTaskFails(0)
+        .noCompleteInstant()
+        .end();
   }
 
   @Test
   public void testInsert() throws Exception {
     // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertEmptyDataFiles();
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    String instant = lastPendingInstant();
-
-    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    checkInstantState(REQUESTED, instant);
-    funcWrapper.checkpointComplete(1);
-    checkWrittenData(tempFile, EXPECTED1);
-    // the coordinator checkpoint commits the inflight instant.
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-    checkWrittenData(tempFile, EXPECTED1);
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+            .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED1)
+        .end();
   }
 
   @Test
   public void testInsertDuplicates() throws Exception {
     // reset the config option
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertEmptyDataFiles();
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    funcWrapper.checkpointComplete(1);
-
-    checkWrittenData(tempFile, EXPECTED3, 1);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
-
-    funcWrapper.checkpointFunction(2);
-
-    nextEvent = funcWrapper.getNextEvent();
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    funcWrapper.checkpointComplete(2);
-
-    checkWrittenData(tempFile, EXPECTED3, 1);
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED3, 1)
+        // insert duplicates again
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED3, 1)
+        .end();
   }
 
   @Test
   public void testUpsert() throws Exception {
     // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertEmptyDataFiles();
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    funcWrapper.checkpointComplete(1);
-
-    // upsert another data buffer
-    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-    // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED1);
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(2);
-
-    String instant = lastPendingInstant();
-
-    nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    checkInstantState(REQUESTED, instant);
-    funcWrapper.checkpointComplete(2);
-    // the coordinator checkpoint commits the inflight instant.
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-    checkWrittenData(tempFile, EXPECTED2);
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        // upsert another data buffer
+        .consume(TestData.DATA_SET_UPDATE_INSERT)
+        // the data is not flushed yet
+        .checkWrittenData(EXPECTED1)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED2)
+        .end();
   }
 
   @Test
   public void testUpsertWithDelete() throws Exception {
     // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertEmptyDataFiles();
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    funcWrapper.checkpointComplete(1);
-
-    // upsert another data buffer
-    for (RowData rowData : TestData.DATA_SET_UPDATE_DELETE) {
-      funcWrapper.invoke(rowData);
-    }
-    // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED1);
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(2);
-
-    String instant = lastPendingInstant();
-
-    nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    checkInstantState(REQUESTED, instant);
-    funcWrapper.checkpointComplete(2);
-    // the coordinator checkpoint commits the inflight instant.
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    Map<String, String> expected = getUpsertWithDeleteExpected();
-    checkWrittenData(tempFile, expected);
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .consume(TestData.DATA_SET_UPDATE_DELETE)
+        .checkWrittenData(EXPECTED1)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(getUpsertWithDeleteExpected())
+        .end();
   }
 
   @Test
   public void testInsertWithMiniBatches() throws Exception {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
-    // so 3 records expect to trigger a mini-batch write
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
-
-    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
-        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
-    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    String instant = lastPendingInstant();
-
-    funcWrapper.checkpointComplete(1);
 
     Map<String, String> expected = getMiniBatchExpected();
-    checkWrittenData(tempFile, expected, 1);
-
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
 
-    funcWrapper.checkpointFunction(2);
-
-    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event4 = funcWrapper.getNextEvent();
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
-    funcWrapper.checkpointComplete(2);
-
-    // Same the original base file content.
-    checkWrittenData(tempFile, expected, 1);
+    preparePipeline(conf)
+        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+        // so 3 records expect to trigger a mini-batch write
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertDataBuffer(1, 2)
+        .checkpoint(1)
+        .allDataFlushed()
+        .handleEvents(2)
+        .checkpointComplete(1)
+        .checkWrittenData(expected, 1)
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .checkpoint(2)
+        .handleEvents(2)
+        .checkpointComplete(2)
+        .checkWrittenData(expected, 1)
+        .end();
   }
 
   @Test
@@ -469,129 +215,43 @@ public class TestWriteCopyOnWrite {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
-    // so 3 records expect to trigger a mini-batch write
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
-        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
-    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    String instant = lastPendingInstant();
-
-    funcWrapper.checkpointComplete(1);
 
     Map<String, String> expected = new HashMap<>();
     expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
 
-    checkWrittenData(tempFile, expected, 1);
-
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    funcWrapper.checkpointFunction(2);
-
-    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event4 = funcWrapper.getNextEvent();
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
-    funcWrapper.checkpointComplete(2);
-
-    // Same the original base file content.
-    checkWrittenData(tempFile, expected, 1);
+    preparePipeline(conf)
+        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+        // so 3 records expect to trigger a mini-batch write
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .assertDataBuffer(1, 2)
+        .checkpoint(1)
+        .allDataFlushed()
+        .handleEvents(2)
+        .checkpointComplete(1)
+        .checkWrittenData(expected, 1)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(2)
+        .handleEvents(2)
+        .checkpointComplete(2)
+        .checkWrittenData(expected, 1)
+        .end();
   }
 
   @Test
   public void testInsertAppendMode() throws Exception {
-    InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    assertNull(funcWrapper.getWriterHelper());
-
-    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
-    assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    String instant = lastPendingInstant();
-
-    funcWrapper.checkpointComplete(1);
-
-    Map<String, String> expected = new HashMap<>();
-
-    expected.put("par1", "["
-        + "id1,par1,id1,Danny,23,0,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,2,par1, "
-        + "id1,par1,id1,Danny,23,3,par1, "
-        + "id1,par1,id1,Danny,23,4,par1]");
-
-    TestData.checkWrittenAllData(tempFile, expected, 1);
-
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    funcWrapper.checkpointFunction(2);
-
-    final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
-    funcWrapper.checkpointComplete(2);
-
-    // same with the original base file content.
-    expected.put("par1", "["
-        + "id1,par1,id1,Danny,23,0,par1, "
-        + "id1,par1,id1,Danny,23,0,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,2,par1, "
-        + "id1,par1,id1,Danny,23,2,par1, "
-        + "id1,par1,id1,Danny,23,3,par1, "
-        + "id1,par1,id1,Danny,23,3,par1, "
-        + "id1,par1,id1,Danny,23,4,par1, "
-        + "id1,par1,id1,Danny,23,4,par1]");
-    TestData.checkWrittenAllData(tempFile, expected, 1);
+    prepareInsertPipeline()
+        // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenAllData(EXPECTED4, 1)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenFullData(EXPECTED5)
+        .end();
   }
 
   /**
@@ -604,145 +264,54 @@ public class TestWriteCopyOnWrite {
     conf.setString(FlinkOptions.OPERATION, "insert");
     conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
-    // so 3 records expect to trigger a mini-batch write
-    // flush the max size bucket once at a time.
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
-        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
-    for (int i = 0; i < 2; i++) {
-      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
-      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
-    }
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    String instant = lastPendingInstant();
-
-    funcWrapper.checkpointComplete(1);
-
-    Map<String, String> expected = new HashMap<>();
-
-    expected.put("par1", "["
-        + "id1,par1,id1,Danny,23,0,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,2,par1, "
-        + "id1,par1,id1,Danny,23,3,par1, "
-        + "id1,par1,id1,Danny,23,4,par1]");
-    TestData.checkWrittenData(tempFile, expected, 1);
-
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
-      funcWrapper.invoke(rowData);
-    }
-
-    funcWrapper.checkpointFunction(2);
-
-    for (int i = 0; i < 2; i++) {
-      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
-    }
-
-    funcWrapper.checkpointComplete(2);
-
-    // same with the original base file content.
-    Map<String, List<String>> expected2 = new HashMap<>();
-    expected2.put("par1", Arrays.asList(
-        "id1,par1,id1,Danny,23,0,par1",
-        "id1,par1,id1,Danny,23,0,par1",
-        "id1,par1,id1,Danny,23,1,par1",
-        "id1,par1,id1,Danny,23,1,par1",
-        "id1,par1,id1,Danny,23,2,par1",
-        "id1,par1,id1,Danny,23,2,par1",
-        "id1,par1,id1,Danny,23,3,par1",
-        "id1,par1,id1,Danny,23,3,par1",
-        "id1,par1,id1,Danny,23,4,par1",
-        "id1,par1,id1,Danny,23,4,par1"));
-
-    // Same the original base file content.
-    TestData.checkWrittenFullData(tempFile, expected2);
+    TestWriteMergeOnRead.TestHarness.instance()
+        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+        // so 3 records expect to trigger a mini-batch write
+        // flush the max size bucket once at a time.
+        .preparePipeline(tempFile, conf)
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .assertDataBuffer(1, 2)
+        .checkpoint(1)
+        .allDataFlushed()
+        .handleEvents(2)
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED4, 1)
+        // insert duplicates again
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .checkpoint(2)
+        .handleEvents(2)
+        .checkpointComplete(2)
+        .checkWrittenFullData(EXPECTED5)
+        .end();
   }
 
   @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-    funcWrapper.openFunction();
-    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
-    // so 3 records expect to trigger a mini-batch write
-    // flush the max size bucket once at a time.
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
-
-    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
-        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
-
-    for (int i = 0; i < 2; i++) {
-      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
-      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
-    }
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    String instant = lastPendingInstant();
-
-    funcWrapper.checkpointComplete(1);
 
     Map<String, String> expected = getMiniBatchExpected();
-    checkWrittenData(tempFile, expected, 1);
-
-    // started a new instant already
-    checkInflightInstant();
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-
-    // insert duplicates again
-    for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
-      funcWrapper.invoke(rowData);
-    }
 
-    funcWrapper.checkpointFunction(2);
-
-    for (int i = 0; i < 2; i++) {
-      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
-    }
-
-    funcWrapper.checkpointComplete(2);
-
-    // Same the original base file content.
-    checkWrittenData(tempFile, expected, 1);
+    preparePipeline(conf)
+        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+        // so 3 records expect to trigger a mini-batch write
+        // flush the max size bucket once at a time.
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .assertDataBuffer(1, 2)
+        .checkpoint(1)
+        .allDataFlushed()
+        .handleEvents(2)
+        .checkpointComplete(1)
+        .checkWrittenData(expected, 1)
+        // insert duplicates again
+        .consume(TestData.DATA_SET_INSERT_DUPLICATES)
+        .checkpoint(2)
+        .handleEvents(2)
+        .checkpointComplete(2)
+        // Same the original base file content.
+        .checkWrittenData(expected, 1)
+        .end();
   }
 
   protected Map<String, String> getMiniBatchExpected() {
@@ -772,71 +341,38 @@ public class TestWriteCopyOnWrite {
   @Test
   public void testIndexStateBootstrap() throws Exception {
     // open the function and ingest data
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertEmptyDataFiles();
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    funcWrapper.checkpointComplete(1);
-
-    // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED1);
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED1, 4)
+        .end();
 
     // reset the config option
     conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // upsert another data buffer
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    checkIndexLoaded(
-        new HoodieKey("id1", "par1"),
-        new HoodieKey("id2", "par1"),
-        new HoodieKey("id3", "par2"),
-        new HoodieKey("id4", "par2"),
-        new HoodieKey("id5", "par3"),
-        new HoodieKey("id6", "par3"),
-        new HoodieKey("id7", "par4"),
-        new HoodieKey("id8", "par4"),
-        new HoodieKey("id9", "par3"),
-        new HoodieKey("id10", "par4"),
-        new HoodieKey("id11", "par4"));
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-
-    assertTrue(funcWrapper.isAlreadyBootstrap());
-
-    String instant = lastPendingInstant();
-
-    nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
-
-    Map<String, String> expected = getExpectedBeforeCheckpointComplete();
-    checkWrittenData(tempFile, expected);
-
-    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
-    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
-
-    checkInstantState(REQUESTED, instant);
-
-    funcWrapper.checkpointComplete(1);
-    // the coordinator checkpoint commits the inflight instant.
-    checkInstantState(HoodieInstant.State.COMPLETED, instant);
-    checkWrittenData(tempFile, EXPECTED2);
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_UPDATE_INSERT)
+        .checkIndexLoaded(
+            new HoodieKey("id1", "par1"),
+            new HoodieKey("id2", "par1"),
+            new HoodieKey("id3", "par2"),
+            new HoodieKey("id4", "par2"),
+            new HoodieKey("id5", "par3"),
+            new HoodieKey("id6", "par3"),
+            new HoodieKey("id7", "par4"),
+            new HoodieKey("id8", "par4"),
+            new HoodieKey("id9", "par3"),
+            new HoodieKey("id10", "par4"),
+            new HoodieKey("id11", "par4"))
+        .checkpoint(1)
+        .assertBootstrapped()
+        .assertNextEvent()
+        .checkWrittenData(getExpectedBeforeCheckpointComplete())
+        .checkpointComplete(1)
+        .checkWrittenData(EXPECTED2)
+        .end();
   }
 
   @Test
@@ -844,46 +380,18 @@ public class TestWriteCopyOnWrite {
     // reset the config option
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 1L);
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
-    // open the function and ingest data
-
-    funcWrapper.openFunction();
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    // no checkpoint, so the coordinator does not accept any events
-    assertTrue(
-        funcWrapper.getEventBuffer().length == 1
-            && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
-
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(1);
-    assertTrue(funcWrapper.isConforming(), "The write function should be waiting for the instant to commit");
-
-    for (int i = 0; i < 4; i++) {
-      final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
-      funcWrapper.getCoordinator().handleEventFromOperator(0, event);
-    }
-
-    funcWrapper.checkpointComplete(1);
-
-    for (RowData rowData : TestData.DATA_SET_INSERT) {
-      funcWrapper.invoke(rowData);
-    }
-
-    assertFalse(funcWrapper.isConforming(), "The write function should finish waiting for the instant to commit");
-
-    // checkpoint for the next round
-    funcWrapper.checkpointFunction(2);
-
-    assertDoesNotThrow(() -> {
-      for (RowData rowData : TestData.DATA_SET_INSERT) {
-        funcWrapper.invoke(rowData);
-      }
-    }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_INSERT)
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertConfirming()
+        .handleEvents(4)
+        .checkpointComplete(1)
+        .consume(TestData.DATA_SET_INSERT)
+        .assertNotConfirming()
+        .checkpoint(2)
+        .assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT)
+        .end();
   }
 
   @Test
@@ -903,59 +411,19 @@ public class TestWriteCopyOnWrite {
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private void checkInflightInstant() {
-    final String instant = TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
-    assertNotNull(instant);
-  }
-
-  private void checkInstantState(HoodieInstant.State state, String instantStr) {
-    final String instant;
-    switch (state) {
-      case REQUESTED:
-        instant = lastPendingInstant();
-        break;
-      case COMPLETED:
-        instant = lastCompleteInstant();
-        break;
-      default:
-        throw new AssertionError("Unexpected state");
-    }
-    assertThat(instant, is(instantStr));
+  private TestHarness preparePipeline() throws Exception {
+    return TestHarness.instance().preparePipeline(tempFile, conf);
   }
 
-  protected String lastPendingInstant() {
-    return TestUtils.getLastPendingInstant(tempFile.getAbsolutePath());
+  private TestHarness preparePipeline(Configuration conf) throws Exception {
+    return TestHarness.instance().preparePipeline(tempFile, conf);
   }
 
-  protected String lastCompleteInstant() {
-    return TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+  protected TestHarness prepareInsertPipeline() throws Exception {
+    return TestHarness.instance().preparePipeline(tempFile, conf, true);
   }
 
   protected HoodieTableType getTableType() {
     return HoodieTableType.COPY_ON_WRITE;
   }
-
-  protected void checkWrittenData(File baseFile, Map<String, String> expected) throws Exception {
-    checkWrittenData(baseFile, expected, 4);
-  }
-
-  protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
-    TestData.checkWrittenData(baseFile, expected, partitions);
-  }
-
-  /**
-   * Asserts the data files are empty.
-   */
-  protected void assertEmptyDataFiles() {
-    File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith("."));
-    assertNotNull(dataFiles);
-    assertThat(dataFiles.length, is(0));
-  }
-
-  private void checkIndexLoaded(HoodieKey... keys) {
-    for (HoodieKey key : keys) {
-      assertTrue(funcWrapper.isKeyInState(key),
-          "Key: " + key + " assumes to be in the index state");
-    }
-  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 064857a..a35a0ac 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,25 +18,11 @@
 
 package org.apache.hudi.sink;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.utils.TestData;
 
-import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.jupiter.api.BeforeEach;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,19 +30,6 @@ import java.util.Map;
  * Test cases for delta stream write.
  */
 public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
-  private FileSystem fs;
-  private HoodieWriteConfig writeConfig;
-  private HoodieFlinkEngineContext context;
-
-  @BeforeEach
-  public void before() throws Exception {
-    super.before();
-    fs = FSUtils.getFs(tempFile.getAbsolutePath(), new org.apache.hadoop.conf.Configuration());
-    writeConfig = StreamerUtil.getHoodieClientConfig(conf);
-    context = new HoodieFlinkEngineContext(
-        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-        new FlinkTaskContextSupplier(null));
-  }
 
   @Override
   protected void setUp(Configuration conf) {
@@ -69,14 +42,6 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
   }
 
   @Override
-  protected void checkWrittenData(File baseFile, Map<String, String> expected, int partitions) throws Exception {
-    HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, context).getMetaClient();
-    Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
-    String latestInstant = lastCompleteInstant();
-    TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
-  }
-
-  @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
   }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7530c89..704d94c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
 
@@ -59,9 +58,4 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
-
-  @Override
-  protected String lastCompleteInstant() {
-    return TestUtils.getLastDeltaCompleteInstant(tempFile.getAbsolutePath());
-  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 4dc197c..642a407 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -47,7 +47,7 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <I> Input type
  */
-public class InsertFunctionWrapper<I> {
+public class InsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
   private final Configuration conf;
   private final RowType rowType;
 
@@ -115,6 +115,11 @@ public class InsertFunctionWrapper<I> {
     return coordinator;
   }
 
+  @Override
+  public void close() throws Exception {
+    this.coordinator.close();
+  }
+
   public BulkInsertWriterHelper getWriterHelper() {
     return this.writeFunction.getWriterHelper();
   }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65224a..54a142a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -66,7 +66,7 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <I> Input type
  */
-public class StreamWriteFunctionWrapper<I> {
+public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
   private final Configuration conf;
 
   private final IOManager ioManager;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
new file mode 100644
index 0000000..d2fe819
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Define the common interfaces for test function wrappers.
+ */
+public interface TestFunctionWrapper<I> {
+  /**
+   * Open all the functions within this wrapper.
+   */
+  void openFunction() throws Exception;
+
+  /**
+   * Process the given input record {@code record}.
+   */
+  void invoke(I record) throws Exception;
+
+  /**
+   * Returns the event buffer sent by the write tasks.
+   */
+  WriteMetadataEvent[] getEventBuffer();
+
+  /**
+   * Returns the next event.
+   */
+  OperatorEvent getNextEvent();
+
+  /**
+   * Snapshot all the functions in the wrapper.
+   */
+  void checkpointFunction(long checkpointId) throws Exception;
+
+  /**
+   * Mark checkpoint with id {code checkpointId} as success.
+   */
+  void checkpointComplete(long checkpointId);
+
+  /**
+   * Returns the operator coordinator.
+   */
+  StreamWriteOperatorCoordinator getCoordinator();
+
+  /**
+   * Returns the data buffer of the write task.
+   */
+  default Map<String, List<HoodieRecord>> getDataBuffer() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Mark checkpoint with id {code checkpointId} as failed.
+   */
+  default void checkpointFails(long checkpointId) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the context of the coordinator.
+   */
+  default MockOperatorCoordinatorContext getCoordinatorContext() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Mark sub-task with id {@code taskId} as failed.
+   */
+  default void subTaskFails(int taskId) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns whether the given key {@code key} is in the state store.
+   */
+  default boolean isKeyInState(HoodieKey key) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns whether the bootstrap function already bootstrapped.
+   */
+  default boolean isAlreadyBootstrap() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns whether the write task is confirming.
+   */
+  default boolean isConforming() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Close this function wrapper.
+   */
+  void close() throws Exception;
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
new file mode 100644
index 0000000..e3b1226
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.FileSystem;
+import org.hamcrest.MatcherAssert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class for write test cases.
+ */
+public class TestWriteBase {
+  protected static final Map<String, String> EXPECTED1 = new HashMap<>();
+
+  protected static final Map<String, String> EXPECTED2 = new HashMap<>();
+
+  protected static final Map<String, String> EXPECTED3 = new HashMap<>();
+
+  protected static final Map<String, String> EXPECTED4 = new HashMap<>();
+
+  protected static final Map<String, List<String>> EXPECTED5 = new HashMap<>();
+
+  static {
+    EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
+    EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
+    EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
+    EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+    EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+    EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
+    EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
+        + "id9,par3,id9,Jane,19,6,par3]");
+    EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
+        + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+    EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+
+    EXPECTED4.put("par1", "["
+        + "id1,par1,id1,Danny,23,0,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,2,par1, "
+        + "id1,par1,id1,Danny,23,3,par1, "
+        + "id1,par1,id1,Danny,23,4,par1]");
+
+    EXPECTED5.put("par1", Arrays.asList(
+        "id1,par1,id1,Danny,23,0,par1",
+        "id1,par1,id1,Danny,23,0,par1",
+        "id1,par1,id1,Danny,23,1,par1",
+        "id1,par1,id1,Danny,23,1,par1",
+        "id1,par1,id1,Danny,23,2,par1",
+        "id1,par1,id1,Danny,23,2,par1",
+        "id1,par1,id1,Danny,23,3,par1",
+        "id1,par1,id1,Danny,23,3,par1",
+        "id1,par1,id1,Danny,23,4,par1",
+        "id1,par1,id1,Danny,23,4,par1"));
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
+  /**
+   * Utils to composite the test stages.
+   */
+  public static class TestHarness {
+    public static TestHarness instance() {
+      return new TestHarness();
+    }
+
+    private File baseFile;
+    private String basePath;
+    private Configuration conf;
+    private TestFunctionWrapper<RowData> pipeline;
+
+    private String lastPending;
+    private String lastComplete;
+
+    public TestHarness preparePipeline(File basePath, Configuration conf) throws Exception {
+      preparePipeline(basePath, conf, false);
+      return this;
+    }
+
+    public TestHarness preparePipeline(File basePath, Configuration conf, boolean append) throws Exception {
+      this.baseFile = basePath;
+      this.basePath = this.baseFile.getAbsolutePath();
+      this.conf = conf;
+      this.pipeline = append
+          ? new InsertFunctionWrapper<>(this.basePath, conf)
+          : new StreamWriteFunctionWrapper<>(this.basePath, conf);
+      // open the function and ingest data
+      this.pipeline.openFunction();
+      return this;
+    }
+
+    public TestHarness consume(List<RowData> inputs) throws Exception {
+      for (RowData rowData : inputs) {
+        this.pipeline.invoke(rowData);
+      }
+      return this;
+    }
+
+    public TestHarness assertConsumeDoesNotThrow(List<RowData> inputs) {
+      assertDoesNotThrow(() -> {
+        consume(inputs);
+      }, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
+      return this;
+    }
+
+    /**
+     * Assert the event buffer is empty.
+     */
+    public TestHarness emptyEventBuffer() {
+      assertTrue(
+          this.pipeline.getEventBuffer().length == 1
+              && this.pipeline.getEventBuffer()[0] == null,
+          "The coordinator events buffer expect to be empty");
+      return this;
+    }
+
+    /**
+     * Assert the next event exists and handle over it to the coordinator.
+     */
+    public TestHarness assertNextEvent() {
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+      return this;
+    }
+
+    /**
+     * Assert the next event exists and handle over it to the coordinator.
+     *
+     * @param numWriteStatus The expected write status num reported by the event
+     * @param partitions     The written partitions reported by the event
+     */
+    public TestHarness assertNextEvent(int numWriteStatus, String partitions) {
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+      List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
+      assertNotNull(writeStatuses);
+      MatcherAssert.assertThat(writeStatuses.size(), is(numWriteStatus));
+      assertThat(writeStatuses.stream()
+              .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
+              .collect(Collectors.joining(",")),
+          is(partitions));
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+      return this;
+    }
+
+    /**
+     * Assert the next event exists and handle over it to the coordinator.
+     *
+     * <p>Validates that the write metadata reported by the event is empty.
+     */
+    public TestHarness assertEmptyEvent() {
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+      List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
+      assertNotNull(writeStatuses);
+      MatcherAssert.assertThat(writeStatuses.size(), is(0));
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+      return this;
+    }
+
+    /**
+     * Assert the data buffer with given number of buckets and records.
+     */
+    public TestHarness assertDataBuffer(int numBuckets, int numRecords) {
+      Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
+      assertThat("Should have " + numBuckets + " data bucket", dataBuffer.size(), is(numBuckets));
+      assertThat(numRecords + " records expect to flush out as a mini-batch",
+          dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+          is(numRecords));
+      return this;
+    }
+
+    /**
+     * Checkpoints the pipeline, which triggers the data write and event send.
+     */
+    public TestHarness checkpoint(long checkpointId) throws Exception {
+      this.pipeline.checkpointFunction(checkpointId);
+      return this;
+    }
+
+    public TestHarness allDataFlushed() {
+      Map<String, List<HoodieRecord>> dataBuffer = this.pipeline.getDataBuffer();
+      assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+      return this;
+    }
+
+    /**
+     * Handle the next {@code numEvents} events and handle over them to the coordinator.
+     */
+    public TestHarness handleEvents(int numEvents) {
+      for (int i = 0; i < numEvents; i++) {
+        final OperatorEvent event = this.pipeline.getNextEvent(); // remove the first event first
+        assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
+        this.pipeline.getCoordinator().handleEventFromOperator(0, event);
+      }
+      assertNotNull(this.pipeline.getEventBuffer()[0], "The coordinator missed the event");
+      return this;
+    }
+
+    /**
+     * Mark the checkpoint with id {@code checkpointId} as finished.
+     */
+    public TestHarness checkpointComplete(long checkpointId) {
+      this.lastPending = lastPendingInstant();
+      this.pipeline.checkpointComplete(checkpointId);
+      // started a new instant already
+      checkInflightInstant();
+      checkInstantState(HoodieInstant.State.COMPLETED, lastPending);
+      this.lastComplete = lastPending;
+      this.lastPending = lastPendingInstant(); // refresh last pending instant
+      return this;
+    }
+
+    /**
+     * Mark the checkpoint finished with empty write metadata.
+     */
+    public TestHarness emptyCheckpoint(long checkpointId) {
+      String lastPending = lastPendingInstant();
+      this.pipeline.checkpointComplete(checkpointId);
+      // last pending instant was reused
+      assertEquals(this.lastPending, lastPending);
+      checkInstantState(HoodieInstant.State.COMPLETED, lastComplete);
+      return this;
+    }
+
+    /**
+     * Mark the checkpoint with id {@code checkpointId} as failed.
+     */
+    public TestHarness checkpointFails(long checkpointId) {
+      this.pipeline.checkpointFails(checkpointId);
+      assertFalse(this.pipeline.getCoordinatorContext().isJobFailed(),
+          "The last checkpoint was aborted, ignore the events");
+      // no complete instant
+      checkInstantState(HoodieInstant.State.COMPLETED, null);
+      return this;
+    }
+
+    public TestHarness checkpointNotThrow(long checkpointId, String message) {
+      // this returns early because there is no inflight instant
+      assertDoesNotThrow(() -> checkpoint(checkpointId), message);
+      return this;
+    }
+
+    /**
+     * Mark the task with id {@code taskId} as failed.
+     */
+    public TestHarness subTaskFails(int taskId) throws Exception {
+      // fails the subtask
+      String instant1 = lastPendingInstant();
+      this.pipeline.subTaskFails(taskId);
+
+      String instant2 = lastPendingInstant();
+      assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
+      return this;
+    }
+
+    public TestHarness noCompleteInstant() {
+      // no complete instant
+      checkInstantState(HoodieInstant.State.COMPLETED, null);
+      return this;
+    }
+
+    /**
+     * Asserts the data files are empty.
+     */
+    public TestHarness assertEmptyDataFiles() {
+      File[] dataFiles = baseFile.listFiles(file -> !file.getName().startsWith("."));
+      assertNotNull(dataFiles);
+      assertThat(dataFiles.length, is(0));
+      return this;
+    }
+
+    public TestHarness checkWrittenData(Map<String, String> expected) throws Exception {
+      checkWrittenData(expected, 4);
+      return this;
+    }
+
+    public TestHarness checkWrittenData(
+        Map<String, String> expected,
+        int partitions) throws Exception {
+      if (OptionsResolver.isCowTable(conf) || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
+        TestData.checkWrittenData(this.baseFile, expected, partitions);
+      } else {
+        checkWrittenDataMor(baseFile, expected, partitions);
+      }
+      return this;
+    }
+
+    private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
+      HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath);
+      Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+      String latestInstant = lastCompleteInstant();
+      FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
+      TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
+    }
+
+    public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
+      TestData.checkWrittenFullData(this.baseFile, expected);
+      return this;
+    }
+
+    public TestHarness checkWrittenAllData(Map<String, String> expected, int partitions) throws IOException {
+      TestData.checkWrittenAllData(baseFile, expected, partitions);
+      return this;
+    }
+
+    public TestHarness checkIndexLoaded(HoodieKey... keys) {
+      for (HoodieKey key : keys) {
+        assertTrue(this.pipeline.isKeyInState(key),
+            "Key: " + key + " assumes to be in the index state");
+      }
+      return this;
+    }
+
+    public TestHarness assertBootstrapped() throws Exception {
+      assertTrue(this.pipeline.isAlreadyBootstrap());
+      return this;
+    }
+
+    public TestHarness assertConfirming() {
+      assertTrue(this.pipeline.isConforming(),
+          "The write function should be waiting for the instant to commit");
+      return this;
+    }
+
+    public TestHarness assertNotConfirming() {
+      assertFalse(this.pipeline.isConforming(),
+          "The write function should finish waiting for the instant to commit");
+      return this;
+    }
+
+    public void end() throws Exception {
+      this.pipeline.close();
+    }
+
+    private String lastPendingInstant() {
+      return TestUtils.getLastPendingInstant(basePath);
+    }
+
+    private void checkInflightInstant() {
+      final String instant = TestUtils.getLastPendingInstant(basePath);
+      assertNotNull(instant);
+    }
+
+    private void checkInstantState(HoodieInstant.State state, String instantStr) {
+      final String instant;
+      switch (state) {
+        case REQUESTED:
+          instant = lastPendingInstant();
+          break;
+        case COMPLETED:
+          instant = lastCompleteInstant();
+          break;
+        default:
+          throw new AssertionError("Unexpected state");
+      }
+      assertThat(instant, is(instantStr));
+    }
+
+    protected String lastCompleteInstant() {
+      return OptionsResolver.isMorTable(conf)
+          ? TestUtils.getLastDeltaCompleteInstant(basePath)
+          : TestUtils.getLastCompleteInstant(basePath);
+    }
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index b0f7b5f..e8e177b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.utils;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
@@ -543,17 +542,15 @@ public class TestData {
     // 1. init flink table
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
-    FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null);
-    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(supplier);
-    HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient);
+    HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
 
     // 2. check each partition data
     expected.forEach((partition, partitionDataSet) -> {
 
       List<String> readBuffer = new ArrayList<>();
 
-      table.getFileSystemView().getAllFileGroups(partition)
-          .forEach(v -> v.getLatestDataFile().ifPresent(baseFile -> {
+      table.getBaseFileOnlyView().getLatestBaseFiles(partition)
+          .forEach(baseFile -> {
             String path = baseFile.getPath();
             try {
               ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path)).build();
@@ -565,7 +562,7 @@ public class TestData {
             } catch (IOException e) {
               throw new RuntimeException(e);
             }
-          }));
+          });
 
       assertTrue(partitionDataSet.size() == readBuffer.size() && partitionDataSet.containsAll(readBuffer));