You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/02/17 07:25:04 UTC

[hudi] branch master updated: [HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (#2553)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d2491d  [HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (#2553)
5d2491d is described below

commit 5d2491d10c70e4e5fc9b7aeb62cc64bcaaf6043f
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Feb 17 15:24:50 2021 +0800

    [HUDI-1598] Write as minor batches during one checkpoint interval for the new writer (#2553)
---
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  10 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  28 ++-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  44 ++++-
 ...ctory.java => ExplicitCreateHandleFactory.java} |  13 +-
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 148 +++++++++++++++
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 202 +++++++++++++++++++++
 .../java/org/apache/hudi/io/MiniBatchHandle.java   |  30 +++
 .../hudi/table/ExplicitWriteHandleTable.java       | 128 +++++++++++++
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    | 114 +++++++++++-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   3 +-
 .../commit/BaseFlinkCommitActionExecutor.java      |  39 +++-
 .../commit/FlinkDeleteCommitActionExecutor.java    |  10 +-
 .../commit/FlinkInsertCommitActionExecutor.java    |   4 +-
 .../FlinkInsertPreppedCommitActionExecutor.java    |   4 +-
 .../hudi/table/action/commit/FlinkMergeHelper.java |   9 +-
 .../commit/FlinkUpsertCommitActionExecutor.java    |   4 +-
 .../FlinkUpsertPreppedCommitActionExecutor.java    |   4 +-
 .../org/apache/hudi/operator/FlinkOptions.java     |   6 +
 .../hudi/operator/KeyedWriteProcessFunction.java   |  10 +-
 .../apache/hudi/operator/StreamWriteFunction.java  | 114 +++++++++---
 .../operator/StreamWriteOperatorCoordinator.java   |  76 +++++---
 .../operator/event/BatchWriteSuccessEvent.java     |  51 +++++-
 .../operator/partitioner/BucketAssignFunction.java |   3 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   7 +
 .../hudi/operator/StreamWriteFunctionTest.java     | 102 ++++++++++-
 .../StreamWriteOperatorCoordinatorTest.java        |  44 ++++-
 .../operator/utils/StreamWriteFunctionWrapper.java |   9 +-
 27 files changed, 1105 insertions(+), 111 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 8fbd1be..357cf1b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -53,11 +53,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
 
   private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
 
-  private final HoodieFileWriter<IndexedRecord> fileWriter;
-  private final Path path;
-  private long recordsWritten = 0;
-  private long insertRecordsWritten = 0;
-  private long recordsDeleted = 0;
+  protected final HoodieFileWriter<IndexedRecord> fileWriter;
+  protected final Path path;
+  protected long recordsWritten = 0;
+  protected long insertRecordsWritten = 0;
+  protected long recordsDeleted = 0;
   private Map<String, HoodieRecord<T>> recordMap;
   private boolean useWriterSchema = false;
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index d5df2cb..0716050 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -95,10 +95,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   protected HoodieFileWriter<IndexedRecord> fileWriter;
 
   protected Path newFilePath;
-  private Path oldFilePath;
+  protected Path oldFilePath;
   protected long recordsWritten = 0;
-  private long recordsDeleted = 0;
-  private long updatedRecordsWritten = 0;
+  protected long recordsDeleted = 0;
+  protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
   protected boolean useWriterSchema;
   private HoodieBaseFile baseFileToMerge;
@@ -133,6 +133,13 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   }
 
   /**
+   * Returns the data file name.
+   */
+  protected String generatesDataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
+  }
+
+  /**
    * Extract old file path, initialize StorageWriter and WriteStatus.
    */
   private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
@@ -149,7 +156,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       partitionMetadata.trySave(getPartitionId());
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
-      String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
+      String newFileName = generatesDataFileName();
       String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
           + newFileName).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
@@ -177,18 +184,25 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   }
 
   /**
-   * Load the new incoming records in a map and return partitionPath.
+   * Initialize a spillable map for incoming records.
    */
-  private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+  protected void initializeIncomingRecordsMap() {
     try {
       // Load the new records in a map
       long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
       LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
       this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
-          new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
+              new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
     } catch (IOException io) {
       throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
     }
+  }
+
+  /**
+   * Load the new incoming records in a map and return partitionPath.
+   */
+  protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+    initializeIncomingRecordsMap();
     while (newRecordsItr.hasNext()) {
       HoodieRecord<T> record = newRecordsItr.next();
       // update the new location of the record, so we know where to find it next
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 0c87f7d..9a7f5e8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -39,6 +40,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.FlinkHoodieIndex;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.FlinkCreateHandle;
+import org.apache.hudi.io.FlinkMergeHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.MiniBatchHandle;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -50,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -58,12 +64,19 @@ import java.util.stream.Collectors;
 public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
 
+  /**
+   * FileID to write handle mapping in order to record the write handles for each file group,
+   * so that we can append the mini-batch data buffer incrementally.
+   */
+  private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
+
   public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
-    super(context, clientConfig);
+    this(context, clientConfig, false);
   }
 
   public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
     super(context, writeConfig, rollbackPending);
+    this.bucketToHandles = new HashMap<>();
   }
 
   public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
@@ -111,7 +124,23 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT);
-    HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
+    final HoodieRecord<T> record = records.get(0);
+    final HoodieRecordLocation loc = record.getCurrentLocation();
+    final String fileID = loc.getFileId();
+    final boolean isInsert = loc.getInstantTime().equals("I");
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+    if (bucketToHandles.containsKey(fileID)) {
+      writeHandle = bucketToHandles.get(fileID);
+    } else {
+      // create the write handle if not exists
+      writeHandle = isInsert
+          ? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(),
+          fileID, table.getTaskContextSupplier())
+          : new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(),
+          fileID, table.getTaskContextSupplier());
+      bucketToHandles.put(fileID, writeHandle);
+    }
+    HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
     }
@@ -202,6 +231,17 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     return getTableAndInitCtx(metaClient, operationType);
   }
 
+  /**
+   * Clean the write handles within a checkpoint interval, this operation
+   * would close the underneath file handles.
+   */
+  public void cleanHandles() {
+    this.bucketToHandles.values().forEach(handle -> {
+      ((MiniBatchHandle) handle).finishWrite();
+    });
+    this.bucketToHandles.clear();
+  }
+
   private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
     if (operationType == WriteOperationType.DELETE) {
       setWriteSchemaForDeletes(metaClient);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java
similarity index 78%
rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java
index d65663e..f2847e2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java
@@ -24,18 +24,21 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
 /**
- * Create handle factory for Flink writer, use the specified fileID directly
- * because it is unique anyway.
+ * Create handle factory for Flink writer, use the specified write handle directly.
  */
-public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
+public class ExplicitCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
     extends CreateHandleFactory<T, I, K, O> {
+  private HoodieWriteHandle<T, I, K, O> writeHandle;
+
+  public ExplicitCreateHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) {
+    this.writeHandle = writeHandle;
+  }
 
   @Override
   public HoodieWriteHandle<T, I, K, O> create(
       HoodieWriteConfig hoodieConfig, String commitTime,
       HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
       String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
-    return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
-        fileIdPrefix, taskContextSupplier);
+    return writeHandle;
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
new file mode 100644
index 0000000..07a7196
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
+ *
+ * <p>For the first mini-batch, it initialize and set up the next file path to write,
+ * but does not close the file writer until all the mini-batches write finish. Each mini-batch
+ * data are appended to the same file.
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
+  private long lastFileSize = 0L;
+
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
+    this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
+        taskContextSupplier);
+  }
+
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
+                           TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair,
+        taskContextSupplier);
+  }
+
+  /**
+   * Called by the compactor code path.
+   */
+  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                           String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
+                           TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier);
+  }
+
+  /**
+   * Get the incremental write status. In mini-batch write mode,
+   * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches),
+   * thus, after a mini-batch append finish, we do not close the underneath writer but return
+   * the incremental WriteStatus instead.
+   *
+   * @return the incremental write status
+   */
+  private WriteStatus getIncrementalWriteStatus() {
+    try {
+      long fileSizeInBytes = FSUtils.getFileSize(fs, path);
+      setUpWriteStatus(fileSizeInBytes);
+      // reset the write status
+      recordsWritten = 0;
+      recordsDeleted = 0;
+      insertRecordsWritten = 0;
+      this.lastFileSize = fileSizeInBytes;
+      writeStatus.setTotalErrorRecords(0);
+      return writeStatus;
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
+    }
+  }
+
+  /**
+   * Set up the write status.
+   *
+   * @param fileSizeInBytes File size in bytes
+   * @throws IOException if error occurs
+   */
+  private void setUpWriteStatus(long fileSizeInBytes) throws IOException {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(writeStatus.getPartitionPath());
+    stat.setNumWrites(recordsWritten);
+    stat.setNumDeletes(recordsDeleted);
+    stat.setNumInserts(insertRecordsWritten);
+    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    stat.setFileId(writeStatus.getFileId());
+    stat.setPath(new Path(config.getBasePath()), path);
+    stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize);
+    stat.setFileSizeInBytes(fileSizeInBytes);
+    stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+    HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+    runtimeStats.setTotalCreateTime(timer.endTimer());
+    stat.setRuntimeStats(runtimeStats);
+    timer = new HoodieTimer().startTimer();
+    writeStatus.setStat(stat);
+  }
+
+  public void finishWrite() {
+    LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
+    try {
+      fileWriter.close();
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
+    }
+  }
+
+  /**
+   * Performs actions to durably, persist the current changes and returns a WriteStatus object.
+   */
+  @Override
+  public List<WriteStatus> close() {
+    return Collections.singletonList(getIncrementalWriteStatus());
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
new file mode 100644
index 0000000..cfd1729
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers).
+ *
+ * <p>For a new data buffer, it initialize and set up the next file path to write,
+ * and closes the file path when the data buffer write finish. When next data buffer
+ * write starts, it rolls over to another new file. If all the data buffers write finish
+ * for a checkpoint round, it renames the last new file path as the desired file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
+
+  /**
+   * Records the current file handles number that rolls over.
+   */
+  private int rollNumber = 0;
+  /**
+   * Records the rolled over file paths.
+   */
+  private List<Path> rolloverPaths;
+  /**
+   * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet.
+   */
+  private boolean needBootStrap = true;
+
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                          TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
+    rolloverPaths = new ArrayList<>();
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                          Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
+                          HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId,
+        dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+   */
+  protected String generatesDataFileName() {
+    return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension());
+  }
+
+  public boolean isNeedBootStrap() {
+    return needBootStrap;
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    List<WriteStatus> writeStatus = super.close();
+    this.needBootStrap = false;
+    return writeStatus;
+  }
+
+  /**
+   * The difference with the parent method is that there is no need to set up
+   * locations for the records.
+   *
+   * @param fileId        The file ID
+   * @param newRecordsItr The incremental records iterator
+   */
+  @Override
+  protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+    initializeIncomingRecordsMap();
+    while (newRecordsItr.hasNext()) {
+      HoodieRecord<T> record = newRecordsItr.next();
+      // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
+      keyToNewRecords.put(record.getRecordKey(), record);
+    }
+    LOG.info(String.format("Number of entries in MemoryBasedMap => %d\n"
+            + "Total size in bytes of MemoryBasedMap => %d\n"
+            + "Number of entries in DiskBasedMap => %d\n"
+            + "Size of file spilled to disk => %d",
+            ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(),
+            ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(),
+            ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(),
+            ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()));
+  }
+
+  /**
+   *
+   * Rolls over the write handle to prepare for the next batch write.
+   *
+   * <p>It tweaks the handle state as following:
+   *
+   * <ul>
+   *   <li>Increment the {@code rollNumber}</li>
+   *   <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li>
+   *   <li>Make the last new file path as old</li>
+   *   <li>Initialize the new file path and file writer</li>
+   * </ul>
+   *
+   * @param newRecordsItr The records iterator to update
+   */
+  public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
+    init(this.fileId, newRecordsItr);
+    this.recordsWritten = 0;
+    this.recordsDeleted = 0;
+    this.updatedRecordsWritten = 0;
+    this.insertRecordsWritten = 0;
+    this.writeStatus.setTotalErrorRecords(0);
+    this.timer = new HoodieTimer().startTimer();
+
+    rollNumber++;
+
+    rolloverPaths.add(newFilePath);
+    oldFilePath = newFilePath;
+    // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+    String newFileName = generatesDataFileName();
+    String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+        + newFileName).toString();
+    newFilePath = new Path(config.getBasePath(), relativePath);
+
+    try {
+      fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e);
+    }
+
+    LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
+        newFilePath.toString()));
+  }
+
+  public void finishWrite() {
+    for (int i = 0; i < rolloverPaths.size() - 1; i++) {
+      Path path = rolloverPaths.get(i);
+      try {
+        fs.delete(path, false);
+      } catch (IOException e) {
+        throw new HoodieIOException("Error when clean the temporary roll file: " + path, e);
+      }
+    }
+    Path lastPath = rolloverPaths.size() > 0
+        ? rolloverPaths.get(rolloverPaths.size() - 1)
+        : newFilePath;
+    String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
+    String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+        + newFileName).toString();
+    final Path desiredPath = new Path(config.getBasePath(), relativePath);
+    try {
+      fs.rename(lastPath, desiredPath);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
new file mode 100644
index 0000000..2cae807
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+/**
+ * Hoodie write handle that supports write as mini-batch.
+ */
+public interface MiniBatchHandle {
+  /**
+   * Finish the write of multiple mini-batches. Usually these mini-bathes
+   * come from a checkpoint interval.
+   */
+  void finishWrite();
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
new file mode 100644
index 0000000..c0699ff
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import java.util.List;
+
+/**
+ * HoodieTable that need to pass in the
+ * {@link org.apache.hudi.io.HoodieWriteHandle} explicitly.
+ */
+public interface ExplicitWriteHandleTable<T extends HoodieRecordPayload> {
+  /**
+   * Upsert a batch of new records into Hoodie table at the supplied instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> upsert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records);
+
+  /**
+   * Insert a batch of new records into Hoodie table at the supplied instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> insert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records);
+
+  /**
+   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
+   * de-duped and non existent keys will be removed before deleting.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param keys   {@link List} of {@link HoodieKey}s to be deleted
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> delete(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieKey> keys);
+
+  /**
+   * Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords);
+
+  /**
+   * Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords);
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 4d7edd7..ddc3fbe 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor;
@@ -61,14 +62,117 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
     super(config, context, metaClient);
   }
 
+  /**
+   * Upsert a batch of new records into Hoodie table at the supplied instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> upsert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
+  }
+
+  /**
+   * Insert a batch of new records into Hoodie table at the supplied instantTime.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param records     hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> insert(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> records) {
+    return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
+  }
+
+  /**
+   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
+   * de-duped and non existent keys will be removed before deleting.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context     HoodieEngineContext
+   * @param writeHandle The write handle
+   * @param instantTime Instant Time for the action
+   * @param keys   {@link List} of {@link HoodieKey}s to be deleted
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> delete(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieKey> keys) {
+    return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute();
+  }
+
+  /**
+   * Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords) {
+    return new FlinkUpsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute();
+  }
+
+  /**
+   * Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
+   *
+   * <p>This implementation requires that the input records are already tagged, and de-duped if needed.
+   *
+   * <p>Specifies the write handle explicitly in order to have fine grained control with
+   * the underneath file.
+   *
+   * @param context    HoodieEngineContext
+   * @param instantTime Instant Time for the action
+   * @param preppedRecords  hoodieRecords to upsert
+   * @return HoodieWriteMetadata
+   */
+  public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(
+      HoodieEngineContext context,
+      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+      String instantTime,
+      List<HoodieRecord<T>> preppedRecords) {
+    return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute();
+  }
+
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) {
-    return new FlinkUpsertCommitActionExecutor<>(context, config, this, instantTime, records).execute();
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) {
-    return new FlinkInsertCommitActionExecutor<>(context, config, this, instantTime, records).execute();
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
@@ -81,7 +185,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context, String instantTime, List<HoodieKey> keys) {
-    return new FlinkDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute();
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
@@ -91,12 +195,12 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
-    return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute();
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
-    return new FlinkInsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute();
+    throw new HoodieNotSupportedException("This method should not be invoked");
   }
 
   @Override
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index a8e6ed1..6b7c4a6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -36,7 +36,8 @@ import org.apache.hudi.index.HoodieIndex;
 import java.util.List;
 
 public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
-    extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+    extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
+    implements ExplicitWriteHandleTable<T> {
   protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 044f841..9b6dcd6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -32,11 +32,14 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
-import org.apache.hudi.io.FlinkCreateHandleFactory;
+import org.apache.hudi.io.ExplicitCreateHandleFactory;
+import org.apache.hudi.io.FlinkMergeHandle;
+import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -71,21 +74,26 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
 
   private static final Logger LOG = LogManager.getLogger(BaseFlinkCommitActionExecutor.class);
 
+  private HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+
   public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
+                                       HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                        HoodieWriteConfig config,
                                        HoodieTable table,
                                        String instantTime,
                                        WriteOperationType operationType) {
-    super(context, config, table, instantTime, operationType, Option.empty());
+    this(context, writeHandle, config, table, instantTime, operationType, Option.empty());
   }
 
   public BaseFlinkCommitActionExecutor(HoodieEngineContext context,
+                                       HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                        HoodieWriteConfig config,
                                        HoodieTable table,
                                        String instantTime,
                                        WriteOperationType operationType,
                                        Option extraMetadata) {
     super(context, config, table, instantTime, operationType, extraMetadata);
+    this.writeHandle = writeHandle;
   }
 
   @Override
@@ -182,6 +190,16 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
         case INSERT:
           return handleInsert(fileIdHint, recordItr);
         case UPDATE:
+          if (this.writeHandle instanceof HoodieCreateHandle) {
+            // During one checkpoint interval, an insert record could also be updated,
+            // for example, for an operation sequence of a record:
+            //    I, U,   | U, U
+            // - batch1 - | - batch2 -
+            // the first batch(batch1) operation triggers an INSERT bucket,
+            // the second batch batch2 tries to reuse the same bucket
+            // and append instead of UPDATE.
+            return handleInsert(fileIdHint, recordItr);
+          }
           return handleUpdate(partitionPath, fileIdHint, recordItr);
         default:
           throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
@@ -203,7 +221,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
+    HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr);
     return handleUpdateInternal(upsertHandle, fileId);
   }
 
@@ -225,11 +243,16 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
     return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
   }
 
-  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
+  protected FlinkMergeHandle getUpdateHandle(Iterator<HoodieRecord<T>> recordItr) {
     if (table.requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
+      throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write");
     } else {
-      return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
+      FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle;
+      // add the incremental records.
+      if (!writeHandle.isNeedBootStrap()) {
+        writeHandle.rollOver(recordItr);
+      }
+      return writeHandle;
     }
   }
 
@@ -242,6 +265,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
     return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
-        taskContextSupplier, new FlinkCreateHandleFactory<>());
+        taskContextSupplier, new ExplicitCreateHandleFactory<>(writeHandle));
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java
index 4b46f7f..2064be3 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -33,9 +34,12 @@ public class FlinkDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>> e
   private final List<HoodieKey> keys;
 
   public FlinkDeleteCommitActionExecutor(HoodieEngineContext context,
-                                         HoodieWriteConfig config, HoodieTable table,
-                                         String instantTime, List<HoodieKey> keys) {
-    super(context, config, table, instantTime, WriteOperationType.DELETE);
+                                         HoodieWriteHandle<?, ?, ?, ?> writeHandle,
+                                         HoodieWriteConfig config,
+                                         HoodieTable table,
+                                         String instantTime,
+                                         List<HoodieKey> keys) {
+    super(context, writeHandle, config, table, instantTime, WriteOperationType.DELETE);
     this.keys = keys;
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java
index e0c47db..0415983 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -34,11 +35,12 @@ public class FlinkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> e
   private List<HoodieRecord<T>> inputRecords;
 
   public FlinkInsertCommitActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                          HoodieWriteConfig config,
                                          HoodieTable table,
                                          String instantTime,
                                          List<HoodieRecord<T>> inputRecords) {
-    super(context, config, table, instantTime, WriteOperationType.INSERT);
+    super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT);
     this.inputRecords = inputRecords;
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java
index 8e53547..459a6db 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -34,9 +35,10 @@ public class FlinkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayloa
   private final List<HoodieRecord<T>> preppedRecords;
 
   public FlinkInsertPreppedCommitActionExecutor(HoodieEngineContext context,
+                                                HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                                 HoodieWriteConfig config, HoodieTable table,
                                                 String instantTime, List<HoodieRecord<T>> preppedRecords) {
-    super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
+    super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index d34aca2..539f551 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.FlinkMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -64,13 +65,15 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
                        HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
     final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
     Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
-    HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
+    FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle =
+        (FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>) upsertHandle;
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
     final GenericDatumWriter<GenericRecord> gWriter;
     final GenericDatumReader<GenericRecord> gReader;
     Schema readSchema;
-    if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
+    if (mergeHandle.isNeedBootStrap()
+        && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
       readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
       gWriter = new GenericDatumWriter<>(readSchema);
       gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
@@ -84,7 +87,7 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
     HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
     try {
       final Iterator<GenericRecord> readerIterator;
-      if (baseFile.getBootstrapBaseFile().isPresent()) {
+      if (mergeHandle.isNeedBootStrap() && baseFile.getBootstrapBaseFile().isPresent()) {
         readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
       } else {
         readerIterator = reader.getRecordIterator(readSchema);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java
index 7842e25..5859bb5 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -34,11 +35,12 @@ public class FlinkUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>> e
   private List<HoodieRecord<T>> inputRecords;
 
   public FlinkUpsertCommitActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                          HoodieWriteConfig config,
                                          HoodieTable table,
                                          String instantTime,
                                          List<HoodieRecord<T>> inputRecords) {
-    super(context, config, table, instantTime, WriteOperationType.UPSERT);
+    super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT);
     this.inputRecords = inputRecords;
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java
index a6ecd93..42d932a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -34,9 +35,10 @@ public class FlinkUpsertPreppedCommitActionExecutor<T extends HoodieRecordPayloa
   private final List<HoodieRecord<T>> preppedRecords;
 
   public FlinkUpsertPreppedCommitActionExecutor(HoodieEngineContext context,
+                                                HoodieWriteHandle<?, ?, ?, ?> writeHandle,
                                                 HoodieWriteConfig config, HoodieTable table,
                                                 String instantTime, List<HoodieRecord<T>> preppedRecords) {
-    super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
+    super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
     this.preppedRecords = preppedRecords;
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
index 655fd1a..f163b02 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
@@ -159,6 +159,12 @@ public class FlinkOptions {
       .defaultValue(4)
       .withDescription("Parallelism of tasks that do actual write, default is 4");
 
+  public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
+      .key("write.batch.size.MB")
+      .doubleType()
+      .defaultValue(128D) // 128MB
+      .withDescription("Batch buffer size in MB to flush data into the underneath filesystem");
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index 4309bb0..d7c0256 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -34,6 +34,7 @@ import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,7 +51,9 @@ import java.util.Map;
 /**
  * A {@link KeyedProcessFunction} where the write operations really happens.
  */
-public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements CheckpointedFunction {
+public class KeyedWriteProcessFunction
+    extends KeyedProcessFunction<String, HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>>
+    implements CheckpointedFunction, CheckpointListener {
 
   private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
   /**
@@ -160,6 +163,11 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
     putDataIntoBuffer(hoodieRecord);
   }
 
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    this.writeClient.cleanHandles();
+  }
+
   public boolean hasRecordsIn() {
     return hasRecordsIn;
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
index 5877098..ba6cea5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
@@ -25,6 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
@@ -33,6 +34,7 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -48,6 +50,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
@@ -58,33 +61,39 @@ import java.util.function.BiFunction;
  * <p><h2>Work Flow</h2>
  *
  * <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
- * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully,
+ * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * or a Flink checkpoint starts. After a batch has been written successfully,
  * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
  *
- * <p><h2>Exactly-once Semantics</h2>
+ * <p><h2>The Semantics</h2>
  *
  * <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
  * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
  * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
- * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer.
- * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint.
- * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.
+ *
+ * <p>In order to improve the throughput, The function process thread does not block data buffering
+ * after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
+ * batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
+ * (e.g. the eager write batch), the semantics is still correct using the UPSERT operation.
  *
  * <p><h2>Fault Tolerance</h2>
  *
- * <p>The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back
- * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover.
- * The operator coordinator would try several times when committing the writestatus.
+ * <p>The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
+ * The operator rolls back the written data and throws to trigger a failover when any error occurs.
+ * This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
+ * If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
+ * step of the #flushBuffer method).
+ *
+ * <p>The operator coordinator would try several times when committing the write status.
  *
- * <p>Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks
- * write to the same file group that conflict. The general case for partition path is a datetime field,
- * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the
- * data by the file group IDs.
+ * <p>Note: The function task requires the input stream be shuffled by the file IDs.
  *
  * @param <I> Type of the input record
  * @see StreamWriteOperatorCoordinator
  */
-public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
+public class StreamWriteFunction<K, I, O>
+    extends KeyedProcessFunction<K, I, O>
+    implements CheckpointedFunction, CheckpointListener {
 
   private static final long serialVersionUID = 1L;
 
@@ -138,9 +147,14 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   private transient OperatorEventGateway eventGateway;
 
   /**
+   * The detector that tells if to flush the data as mini-batch.
+   */
+  private transient BufferSizeDetector detector;
+
+  /**
    * Constructs a StreamingSinkFunction.
    *
-   * @param config  The config options
+   * @param config The config options
    */
   public StreamWriteFunction(Configuration config) {
     this.config = config;
@@ -149,6 +163,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
     initBuffer();
     initWriteClient();
     initWriteFunction();
@@ -166,11 +181,8 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
       // Based on the fact that the coordinator starts the checkpoint first,
       // it would check the validity.
       this.onCheckpointing = true;
-      this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
-      Preconditions.checkNotNull(this.currentInstant,
-          "No inflight instant when flushing data");
       // wait for the buffer data flush out and request a new instant
-      flushBuffer();
+      flushBuffer(true);
       // signal the task thread to start buffering
       addToBufferCondition.signal();
     } finally {
@@ -186,6 +198,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
       if (onCheckpointing) {
         addToBufferCondition.await();
       }
+      flushBufferOnCondition(value);
       putDataIntoBuffer(value);
     } finally {
       bufferLock.unlock();
@@ -199,6 +212,11 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
     }
   }
 
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {
+    this.writeClient.cleanHandles();
+  }
+
   // -------------------------------------------------------------------------
   //  Getter/Setter
   // -------------------------------------------------------------------------
@@ -252,6 +270,42 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
     }
   }
 
+  /**
+   * Tool to detect if to flush out the existing buffer.
+   * Sampling the record to compute the size with 0.01 percentage.
+   */
+  private static class BufferSizeDetector {
+    private final Random random = new Random(47);
+    private static final int DENOMINATOR = 100;
+
+    private final double batchSizeBytes;
+
+    private long lastRecordSize = -1L;
+    private long totalSize = 0L;
+
+    BufferSizeDetector(double batchSizeMb) {
+      this.batchSizeBytes = batchSizeMb * 1024 * 1024;
+    }
+
+    boolean detect(Object record) {
+      if (lastRecordSize == -1 || sampling()) {
+        lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
+      }
+      totalSize += lastRecordSize;
+      return totalSize > this.batchSizeBytes;
+    }
+
+    boolean sampling() {
+      // 0.01 sampling percentage
+      return random.nextInt(DENOMINATOR) == 1;
+    }
+
+    void reset() {
+      this.lastRecordSize = -1L;
+      this.totalSize = 0L;
+    }
+  }
+
   private void putDataIntoBuffer(I value) {
     HoodieRecord<?> record = (HoodieRecord<?>) value;
     final String fileId = record.getCurrentLocation().getFileId();
@@ -262,8 +316,25 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
     this.buffer.get(key).add(record);
   }
 
+  /**
+   * Flush the data buffer if the buffer size is greater than
+   * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
+   *
+   * @param value HoodieRecord
+   */
+  private void flushBufferOnCondition(I value) {
+    boolean needFlush = this.detector.detect(value);
+    if (needFlush) {
+      flushBuffer(false);
+      this.detector.reset();
+    }
+  }
+
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBuffer() {
+  private void flushBuffer(boolean isFinalBatch) {
+    this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+    Preconditions.checkNotNull(this.currentInstant,
+        "No inflight instant when flushing data");
     final List<WriteStatus> writeStatus;
     if (buffer.size() > 0) {
       writeStatus = new ArrayList<>();
@@ -278,12 +349,13 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
             }
           });
-      this.buffer.clear();
     } else {
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
       writeStatus = Collections.emptyList();
     }
-    this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
+    this.eventGateway.sendEventToCoordinator(
+        new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch));
+    this.buffer.clear();
     this.currentInstant = "";
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
index bf0cfc2..bd933c2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -80,12 +80,15 @@ public class StreamWriteOperatorCoordinator
    */
   private transient HoodieFlinkWriteClient writeClient;
 
+  /**
+   * Current data buffering checkpoint.
+   */
   private long inFlightCheckpoint = -1;
 
   /**
    * Current REQUESTED instant, for validation.
    */
-  private String inFlightInstant = "";
+  private String instant = "";
 
   /**
    * Event buffer for one round of checkpointing. When all the elements are non-null and have the same
@@ -119,6 +122,8 @@ public class StreamWriteOperatorCoordinator
     initWriteClient();
     // init table, create it if not exists.
     initTableIfNotExists(this.conf);
+    // start a new instant
+    startInstant();
   }
 
   @Override
@@ -132,20 +137,14 @@ public class StreamWriteOperatorCoordinator
   @Override
   public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
     try {
-      final String errMsg = "A new checkpoint starts while the last checkpoint buffer"
-          + " data has not finish writing, roll back the last write and throw";
-      checkAndForceCommit(errMsg);
-      this.inFlightInstant = this.writeClient.startCommit();
-      this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
       this.inFlightCheckpoint = checkpointId;
-      LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
       result.complete(writeCheckpointBytes());
     } catch (Throwable throwable) {
       // when a checkpoint fails, throws directly.
       result.completeExceptionally(
           new CompletionException(
               String.format("Failed to checkpoint Instant %s for source %s",
-                  this.inFlightInstant, this.getClass().getSimpleName()), throwable));
+                  this.instant, this.getClass().getSimpleName()), throwable));
     }
   }
 
@@ -153,6 +152,15 @@ public class StreamWriteOperatorCoordinator
   public void checkpointComplete(long checkpointId) {
     // start to commit the instant.
     checkAndCommitWithRetry();
+    // start new instant.
+    startInstant();
+  }
+
+  private void startInstant() {
+    this.instant = this.writeClient.startCommit();
+    this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
+            this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
   }
 
   public void notifyCheckpointAborted(long checkpointId) {
@@ -175,10 +183,14 @@ public class StreamWriteOperatorCoordinator
     Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
         "The coordinator can only handle BatchWriteSuccessEvent");
     BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
-    Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant),
+    Preconditions.checkState(event.getInstantTime().equals(this.instant),
         String.format("Receive an unexpected event for instant %s from task %d",
             event.getInstantTime(), event.getTaskID()));
-    this.eventBuffer[event.getTaskID()] = event;
+    if (this.eventBuffer[event.getTaskID()] != null) {
+      this.eventBuffer[event.getTaskID()].mergeWith(event);
+    } else {
+      this.eventBuffer[event.getTaskID()] = event;
+    }
   }
 
   @Override
@@ -218,7 +230,7 @@ public class StreamWriteOperatorCoordinator
          DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
 
       out.writeLong(this.inFlightCheckpoint);
-      byte[] serializedInstant = this.inFlightInstant.getBytes();
+      byte[] serializedInstant = this.instant.getBytes();
       out.writeInt(serializedInstant.length);
       out.write(serializedInstant);
       out.flush();
@@ -239,12 +251,12 @@ public class StreamWriteOperatorCoordinator
       int serializedInstantSize = in.readInt();
       byte[] serializedInstant = readBytes(in, serializedInstantSize);
       this.inFlightCheckpoint = checkpointID;
-      this.inFlightInstant = new String(serializedInstant);
+      this.instant = new String(serializedInstant);
     }
   }
 
   private void reset() {
-    this.inFlightInstant = "";
+    this.instant = "";
     this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
   }
 
@@ -253,8 +265,8 @@ public class StreamWriteOperatorCoordinator
       // forced but still has inflight instant
       String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
       if (inflightInstant != null) {
-        assert inflightInstant.equals(this.inFlightInstant);
-        writeClient.rollback(this.inFlightInstant);
+        assert inflightInstant.equals(this.instant);
+        writeClient.rollback(this.instant);
         throw new HoodieException(errMsg);
       }
       if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
@@ -277,6 +289,10 @@ public class StreamWriteOperatorCoordinator
         if (!checkReady()) {
           // Do not throw if the try times expires but the event buffer are still not ready,
           // because we have a force check when next checkpoint starts.
+          if (tryTimes == retryTimes) {
+            // Throw if the try times expires but the event buffer are still not ready
+            throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed");
+          }
           sleepFor(retryIntervalMillis);
           continue;
         }
@@ -284,9 +300,9 @@ public class StreamWriteOperatorCoordinator
         return;
       } catch (Throwable throwable) {
         String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
-        LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause);
+        LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause);
         if (tryTimes == retryTimes) {
-          throw new HoodieException(throwable);
+          throw new HoodieException("Not all write tasks finish the batch write to commit", throwable);
         }
         sleepFor(retryIntervalMillis);
       }
@@ -307,8 +323,8 @@ public class StreamWriteOperatorCoordinator
 
   /** Checks the buffer is ready to commit. */
   private boolean checkReady() {
-    return Arrays.stream(eventBuffer).allMatch(event ->
-        event != null && event.getInstantTime().equals(this.inFlightInstant));
+    return Arrays.stream(eventBuffer)
+        .allMatch(event -> event != null && event.isReady(this.instant));
   }
 
   /** Performs the actual commit action. */
@@ -320,7 +336,7 @@ public class StreamWriteOperatorCoordinator
 
     if (writeResults.size() == 0) {
       // No data has written, clear the metadata file
-      this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
+      this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
       reset();
       return;
     }
@@ -337,12 +353,12 @@ public class StreamWriteOperatorCoordinator
             + totalErrorRecords + "/" + totalRecords);
       }
 
-      boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata));
+      boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata));
       if (success) {
         reset();
-        LOG.info("Commit instant [{}] success!", this.inFlightInstant);
+        LOG.info("Commit instant [{}] success!", this.instant);
       } else {
-        throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant));
+        throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant));
       }
     } else {
       LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
@@ -355,8 +371,8 @@ public class StreamWriteOperatorCoordinator
         }
       });
       // Rolls back instant
-      writeClient.rollback(this.inFlightInstant);
-      throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant));
+      writeClient.rollback(this.instant);
+      throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant));
     }
   }
 
@@ -366,8 +382,14 @@ public class StreamWriteOperatorCoordinator
   }
 
   @VisibleForTesting
-  public String getInFlightInstant() {
-    return inFlightInstant;
+  public String getInstant() {
+    return instant;
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("rawtypes")
+  public HoodieFlinkWriteClient getWriteClient() {
+    return writeClient;
   }
 
   /**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
index db5432e..f03e8d3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
@@ -21,7 +21,9 @@ package org.apache.hudi.operator.event;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.ValidationUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -30,17 +32,38 @@ import java.util.List;
 public class BatchWriteSuccessEvent implements OperatorEvent {
   private static final long serialVersionUID = 1L;
 
-  private final List<WriteStatus> writeStatuses;
+  private List<WriteStatus> writeStatuses;
   private final int taskID;
   private final String instantTime;
+  private boolean isLastBatch;
 
   public BatchWriteSuccessEvent(
       int taskID,
       String instantTime,
       List<WriteStatus> writeStatuses) {
+    this(taskID, instantTime, writeStatuses, false);
+  }
+
+  /**
+   * Creates an event.
+   *
+   * @param taskID        The task ID
+   * @param instantTime   The instant time under which to write the data
+   * @param writeStatuses The write statues list
+   * @param isLastBatch   Whether the event reports the last batch
+   *                      within an checkpoint interval,
+   *                      if true, the whole data set of the checkpoint
+   *                      has been flushed successfully
+   */
+  public BatchWriteSuccessEvent(
+      int taskID,
+      String instantTime,
+      List<WriteStatus> writeStatuses,
+      boolean isLastBatch) {
     this.taskID = taskID;
     this.instantTime = instantTime;
-    this.writeStatuses = writeStatuses;
+    this.writeStatuses = new ArrayList<>(writeStatuses);
+    this.isLastBatch = isLastBatch;
   }
 
   public List<WriteStatus> getWriteStatuses() {
@@ -54,4 +77,28 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
   public String getInstantTime() {
     return instantTime;
   }
+
+  public boolean isLastBatch() {
+    return isLastBatch;
+  }
+
+  /**
+   * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}.
+   *
+   * @param other The event to be merged
+   */
+  public void mergeWith(BatchWriteSuccessEvent other) {
+    ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime));
+    ValidationUtils.checkArgument(this.taskID == other.taskID);
+    this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true.
+    List<WriteStatus> statusList = new ArrayList<>();
+    statusList.addAll(this.writeStatuses);
+    statusList.addAll(other.writeStatuses);
+    this.writeStatuses = statusList;
+  }
+
+  /** Returns whether the event is ready to commit. */
+  public boolean isReady(String currentInstant) {
+    return isLastBatch && this.instantTime.equals(currentInstant);
+  }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
index 269ccc8..590ee3b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
@@ -89,7 +89,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
 
   @Override
   public void snapshotState(FunctionSnapshotContext context) {
-    this.bucketAssigner.reset();
+    // no operation
   }
 
   @Override
@@ -144,6 +144,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
   @Override
   public void notifyCheckpointComplete(long l) {
     // Refresh the table state when there are new commits.
+    this.bucketAssigner.reset();
     this.bucketAssigner.refreshTable();
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4447705..1c40284 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
@@ -53,6 +54,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
 
 /**
@@ -293,4 +295,9 @@ public class StreamerUtil {
   public static String generateBucketKey(String partitionPath, String fileId) {
     return String.format("%s_%s", partitionPath, fileId);
   }
+
+  /** Returns whether the location represents an insert. */
+  public static boolean isInsert(HoodieRecordLocation loc) {
+    return Objects.equals(loc.getInstantTime(), "I");
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
index fea9b8f..12a00dd 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
@@ -20,6 +20,7 @@ package org.apache.hudi.operator;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
@@ -150,7 +151,8 @@ public class StreamWriteFunctionTest {
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
 
     funcWrapper.checkpointComplete(2);
-    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
+    // started a new instant already
+    checkInflightInstant(funcWrapper.getWriteClient());
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
   }
 
@@ -187,12 +189,13 @@ public class StreamWriteFunctionTest {
       funcWrapper.invoke(rowData);
     }
 
-    // this triggers the data write and event send
-    funcWrapper.checkpointFunction(2);
-    // Do not sent the write event and fails the checkpoint
-    assertThrows(HoodieException.class,
-        () -> funcWrapper.checkpointFails(2),
-        "The last checkpoint was aborted, roll back the last write and throw");
+    // this triggers NPE cause there is no inflight instant
+    assertThrows(NullPointerException.class,
+        () -> funcWrapper.checkpointFunction(2),
+        "No inflight instant when flushing data");
+    // do not sent the write event and fails the checkpoint,
+    // behaves like the last checkpoint is successful.
+    funcWrapper.checkpointFails(2);
   }
 
   @Test
@@ -212,13 +215,13 @@ public class StreamWriteFunctionTest {
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
-    checkWrittenData(tempFile, EXPECTED1);
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
 
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
     funcWrapper.checkpointComplete(1);
+    checkWrittenData(tempFile, EXPECTED1);
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
     checkWrittenData(tempFile, EXPECTED1);
@@ -241,15 +244,16 @@ public class StreamWriteFunctionTest {
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
 
-    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    OperatorEvent nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
-    checkWrittenData(tempFile, EXPECTED3, 1);
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
 
     funcWrapper.checkpointComplete(1);
 
+    checkWrittenData(tempFile, EXPECTED3, 1);
+
     // insert duplicates again
     for (RowData rowData : TestData.DATA_SET_THREE) {
       funcWrapper.invoke(rowData);
@@ -257,6 +261,10 @@ public class StreamWriteFunctionTest {
 
     funcWrapper.checkpointFunction(2);
 
+    nextEvent = funcWrapper.getNextEvent();
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    funcWrapper.checkpointComplete(2);
+
     checkWrittenData(tempFile, EXPECTED3, 1);
   }
 
@@ -306,11 +314,85 @@ public class StreamWriteFunctionTest {
     checkWrittenData(tempFile, EXPECTED2);
   }
 
+  @Test
+  public void testInsertWithMiniBatches() throws Exception {
+    // reset the config option
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
+    assertThat("2 records expect to flush out as a mini-batch",
+        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+        is(3));
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
+    final OperatorEvent event2 = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+    funcWrapper.checkpointComplete(1);
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,1,par1, "
+        + "id1,par1,id1,Danny,23,1,par1]");
+    checkWrittenData(tempFile, expected, 1);
+
+    // started a new instant already
+    checkInflightInstant(funcWrapper.getWriteClient());
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
+    final OperatorEvent event4 = funcWrapper.getNextEvent();
+    final OperatorEvent event5 = funcWrapper.getNextEvent();
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
+    funcWrapper.checkpointComplete(2);
+
+    // Same the original base file content.
+    checkWrittenData(tempFile, expected, 1);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
 
   @SuppressWarnings("rawtypes")
+  private void checkInflightInstant(HoodieFlinkWriteClient writeClient) {
+    final String instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE");
+    assertNotNull(instant);
+  }
+
+  @SuppressWarnings("rawtypes")
   private void checkInstantState(
       HoodieFlinkWriteClient writeClient,
       HoodieInstant.State state,
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
index c533b48..732cc65 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.util.StreamerUtil;
@@ -37,6 +40,9 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -62,6 +68,34 @@ public class StreamWriteOperatorCoordinatorTest {
   }
 
   @Test
+  void testInstantState() {
+    String instant = coordinator.getInstant();
+    assertNotEquals("", instant);
+
+    WriteStatus writeStatus = new WriteStatus(true, 0.1D);
+    writeStatus.setPartitionPath("par1");
+    writeStatus.setStat(new HoodieWriteStat());
+    OperatorEvent event0 =
+        new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true);
+
+    WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
+    writeStatus1.setPartitionPath("par2");
+    writeStatus1.setStat(new HoodieWriteStat());
+    OperatorEvent event1 =
+        new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true);
+    coordinator.handleEventFromOperator(0, event0);
+    coordinator.handleEventFromOperator(1, event1);
+
+    coordinator.checkpointComplete(1);
+    String inflight = coordinator.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+    String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE");
+    assertThat("Instant should be complete", lastCompleted, is(instant));
+    assertNotEquals("", inflight, "Should start a new instant");
+    assertNotEquals(instant, inflight, "Should start a new instant");
+  }
+
+  @Test
   public void testTableInitialized() throws IOException {
     final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
     String basePath = tempFile.getAbsolutePath();
@@ -88,14 +122,14 @@ public class StreamWriteOperatorCoordinatorTest {
   }
 
   @Test
-  public void testCheckpointInvalid() {
+  public void testCheckpointCompleteWithRetry() {
     final CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);
-    String inflightInstant = coordinator.getInFlightInstant();
+    String inflightInstant = coordinator.getInstant();
     OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
     coordinator.handleEventFromOperator(0, event);
-    final CompletableFuture<byte[]> future2 = new CompletableFuture<>();
-    coordinator.checkpointCoordinator(2, future2);
-    assertTrue(future2.isCompletedExceptionally());
+    assertThrows(HoodieException.class,
+        () -> coordinator.checkpointComplete(1),
+        "Try 3 to commit instant");
   }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
index 59de283..6f7062c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventG
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -77,11 +79,11 @@ public class StreamWriteFunctionWrapper<I> {
     this.conf = conf;
     // one function
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
-    this.coordinator.start();
     this.functionInitializationContext = new MockFunctionInitializationContext();
   }
 
   public void openFunction() throws Exception {
+    this.coordinator.start();
     toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
@@ -123,6 +125,10 @@ public class StreamWriteFunctionWrapper<I> {
     return this.gateway.getNextEvent();
   }
 
+  public Map<String, List<HoodieRecord>> getDataBuffer() {
+    return this.writeFunction.getBuffer();
+  }
+
   @SuppressWarnings("rawtypes")
   public HoodieFlinkWriteClient getWriteClient() {
     return this.writeFunction.getWriteClient();
@@ -141,6 +147,7 @@ public class StreamWriteFunctionWrapper<I> {
     functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
     coordinator.checkpointComplete(checkpointId);
     this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+    this.writeFunction.notifyCheckpointComplete(checkpointId);
   }
 
   public void checkpointFails(long checkpointId) {