You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/05/14 02:25:36 UTC

[hudi] branch master updated: [HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad77cf4  [HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)
ad77cf4 is described below

commit ad77cf42ba7d8eab086be1f6d91f465bf29164ed
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri May 14 10:25:18 2021 +0800

    [HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)
    
    Close the file handle eagerly to avoid corrupted files as much as
    possible.
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  13 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  19 +-
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  68 +++----
 .../java/org/apache/hudi/io/FlinkCreateHandle.java |  79 ++-------
 .../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 196 +++++++++++++++++++++
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  |  92 +++-------
 .../java/org/apache/hudi/io/MiniBatchHandle.java   |  30 ++--
 .../commit/BaseFlinkCommitActionExecutor.java      |  58 +++---
 .../hudi/table/action/commit/FlinkMergeHelper.java |  22 +--
 .../delta/BaseFlinkDeltaCommitActionExecutor.java  |   3 -
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  26 +--
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |   5 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   1 +
 13 files changed, 338 insertions(+), 274 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index d0778f7..9800749 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -106,9 +106,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
   public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                            Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
                            TaskContextSupplier taskContextSupplier) {
+    this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
+        hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
+  }
+
+  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                           Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                           TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
     init(fileId, recordItr);
-    init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
+    init(fileId, partitionPath, baseFile);
   }
 
   /**
@@ -173,6 +180,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
     }
   }
 
+  protected void setWriteStatusPath() {
+    writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
+  }
+
   protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
     oldFilePath = makeNewFilePath(partitionPath, oldFileName);
     newFilePath = makeNewFilePath(partitionPath, newFileName);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 6cbbeec..9ba0961 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -47,6 +47,7 @@ import org.apache.hudi.index.FlinkHoodieIndex;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.FlinkAppendHandle;
 import org.apache.hudi.io.FlinkCreateHandle;
+import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
 import org.apache.hudi.io.FlinkMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
@@ -376,12 +377,10 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   /**
-   * Clean the write handles within a checkpoint interval, this operation
-   * would close the underneath file handles.
+   * Clean the write handles within a checkpoint interval.
+   * All the handles should have been closed already.
    */
   public void cleanHandles() {
-    this.bucketToHandles.values()
-        .forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
     this.bucketToHandles.clear();
   }
 
@@ -414,10 +413,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
       Iterator<HoodieRecord<T>> recordItr) {
     final HoodieRecordLocation loc = record.getCurrentLocation();
     final String fileID = loc.getFileId();
+    final String partitionPath = record.getPartitionPath();
     if (bucketToHandles.containsKey(fileID)) {
-      return bucketToHandles.get(fileID);
+      MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
+      if (lastHandle.shouldReplace()) {
+        HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
+            config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
+            lastHandle.getWritePath());
+        this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
+        return writeHandle;
+      }
     }
-    final String partitionPath = record.getPartitionPath();
+
     final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
     final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
     if (isDelta) {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index c859d26..987f335 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -23,36 +23,32 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 /**
- * A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches).
+ * A {@link HoodieAppendHandle} that supports APPEND write incrementally(mini-batches).
  *
- * <p>For the first mini-batch, it initialize and set up the next file path to write,
- * but does not close the file writer until all the mini-batches write finish. Each mini-batch
- * data are appended to this handle, the back-up writer may rollover on condition.
+ * <p>For the first mini-batch, it initializes and sets up the next file path to write,
+ * then closes the file writer. The subsequent mini-batches are appended to the same file
+ * through a different append handle with same write file name.
  *
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * <p>The back-up writer may rollover on condition(for e.g, the filesystem does not support append
+ * or the file size hits the configured threshold).
  */
 public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
     extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
 
   private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
 
-  private boolean shouldRollover = false;
+  private boolean isClosed = false;
 
   public FlinkAppendHandle(
       HoodieWriteConfig config,
@@ -92,49 +88,37 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
     return true;
   }
 
-  public boolean shouldRollover() {
-    return this.shouldRollover;
-  }
-
-  /**
-   * Appends new records into this append handle.
-   * @param recordItr The new records iterator
-   */
-  public void appendNewRecords(Iterator<HoodieRecord<T>> recordItr) {
-    this.recordItr = recordItr;
-  }
-
   @Override
   public List<WriteStatus> close() {
-    shouldRollover = true;
-    // flush any remaining records to disk
-    appendDataAndDeleteBlocks(header);
-    // need to fix that the incremental write size in bytes may be lost
-    List<WriteStatus> ret = new ArrayList<>(statuses);
-    statuses.clear();
-    return ret;
-  }
-
-  @Override
-  public void finishWrite() {
-    LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
     try {
-      if (writer != null) {
-        writer.close();
-      }
-    } catch (IOException e) {
-      throw new HoodieUpsertException("Failed to close append handle", e);
+      return super.close();
+    } finally {
+      this.isClosed = true;
     }
   }
 
   @Override
   public void closeGracefully() {
+    if (isClosed) {
+      return;
+    }
     try {
-      finishWrite();
+      close();
     } catch (Throwable throwable) {
       // The intermediate log file can still append based on the incremental MERGE semantics,
       // there is no need to delete the file.
       LOG.warn("Error while trying to dispose the APPEND handle", throwable);
     }
   }
+
+  @Override
+  public Path getWritePath() {
+    return writer.getLogFile().getPath();
+  }
+
+  @Override
+  public boolean shouldReplace() {
+    // log files can append new data buffer directly
+    return false;
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index fe5bf99..2de6a57 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -23,11 +23,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -36,27 +34,24 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 /**
- * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
+ * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches).
  *
- * <p>For the first mini-batch, it initialize and set up the next file path to write,
- * but does not close the file writer until all the mini-batches write finish. Each mini-batch
- * data are appended to the same file.
+ * <p>For the first mini-batch, it initializes and sets up the next file path to write,
+ * then closes the file writer. The subsequent mini-batches are appended to a file with new name,
+ * the new file would then rename to this file name,
+ * behaves like each mini-batch data are appended to the same file.
  *
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * @see FlinkMergeAndReplaceHandle
  */
 public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
     extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
 
   private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
-  private long lastFileSize = 0L;
-  private long totalRecordsWritten = 0L;
+
+  private boolean isClosed = false;
 
   public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                            String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
@@ -138,57 +133,22 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
     return makeNewFilePath(partitionPath, dataFileName);
   }
 
-  /**
-   * Get the incremental write status. In mini-batch write mode,
-   * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches),
-   * thus, after a mini-batch append finish, we do not close the underneath writer but return
-   * the incremental WriteStatus instead.
-   *
-   * @return the incremental write status
-   */
-  private WriteStatus getIncrementalWriteStatus() {
-    try {
-      setupWriteStatus();
-      // reset the write status
-      totalRecordsWritten += recordsWritten;
-      recordsWritten = 0;
-      recordsDeleted = 0;
-      insertRecordsWritten = 0;
-      timer = new HoodieTimer().startTimer();
-      writeStatus.setTotalErrorRecords(0);
-      return writeStatus;
-    } catch (IOException e) {
-      throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
-    }
-  }
-
   @Override
-  protected long computeTotalWriteBytes() {
-    long fileSizeInBytes = computeFileSizeInBytes();
-    long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
-    this.lastFileSize = fileSizeInBytes;
-    return incFileSizeInBytes;
-  }
-
-  @Override
-  protected long computeFileSizeInBytes() {
-    return fileWriter.getBytesWritten();
-  }
-
-  @Override
-  public void finishWrite() {
-    LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten);
+  public List<WriteStatus> close() {
     try {
-      fileWriter.close();
-    } catch (IOException e) {
-      throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
+      return super.close();
+    } finally {
+      this.isClosed = true;
     }
   }
 
   @Override
   public void closeGracefully() {
+    if (isClosed) {
+      return;
+    }
     try {
-      finishWrite();
+      close();
     } catch (Throwable throwable) {
       LOG.warn("Error while trying to dispose the CREATE handle", throwable);
       try {
@@ -201,11 +161,8 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
     }
   }
 
-  /**
-   * Performs actions to durably, persist the current changes and returns a WriteStatus object.
-   */
   @Override
-  public List<WriteStatus> close() {
-    return Collections.singletonList(getIncrementalWriteStatus());
+  public Path getWritePath() {
+    return path;
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
new file mode 100644
index 0000000..44d630a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
+ *
+ * <P>This handle is needed from the second mini-batch write for COW data bucket
+ * when the data bucket is written using multiple mini-batches.
+ *
+ * <p>For the incremental data buffer, it initializes and sets up the next file path to write,
+ * then closes the file and rename to the old file name,
+ * behaves like the new data buffer are appended to the old file.
+ */
+public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
+    extends HoodieMergeHandle<T, I, K, O>
+    implements MiniBatchHandle {
+
+  private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandle.class);
+
+  private boolean isClosed = false;
+
+  public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                    Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+                                    TaskContextSupplier taskContextSupplier, Path basePath) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
+        new HoodieBaseFile(basePath.toString()));
+    // delete invalid data files generated by task retry.
+    if (getAttemptId() > 0) {
+      deleteInvalidDataFile(getAttemptId() - 1);
+    }
+  }
+
+  /**
+   * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
+   * (thus the fs view got the written data files some of which may be invalid),
+   * it goes on with the next round checkpoint(B) write immediately,
+   * if it tries to reuse the last small data bucket(small file) of an invalid data file,
+   * finally, when the coordinator receives the checkpoint success event of checkpoint(A),
+   * the invalid data file would be cleaned,
+   * and this merger got a FileNotFoundException when it close the write file handle.
+   *
+   * <p> To solve, deletes the invalid data file eagerly
+   * so that the invalid file small bucket would never be reused.
+   *
+   * @param lastAttemptId The last attempt ID
+   */
+  private void deleteInvalidDataFile(long lastAttemptId) {
+    final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId);
+    final String lastDataFileName = FSUtils.makeDataFileName(instantTime,
+        lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
+    final Path path = makeNewFilePath(partitionPath, lastDataFileName);
+    try {
+      if (fs.exists(path)) {
+        LOG.info("Deleting invalid MERGE and REPLACE base file due to task retry: " + lastDataFileName);
+        fs.delete(path, false);
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Error while deleting the MERGE and REPLACE base file due to task retry: " + lastDataFileName, e);
+    }
+  }
+
+  @Override
+  protected void createMarkerFile(String partitionPath, String dataFileName) {
+    // no need to create any marker file for intermediate file.
+  }
+
+  @Override
+  protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
+    // old and new file name expects to be the same.
+    if (!oldFileName.equals(newFileName)) {
+      LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
+          + "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
+          + "this rarely happens when the checkpoint success event was not received yet\n"
+          + "but the write task flush with new instant time, which does not break the UPSERT semantics");
+    }
+    super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
+    try {
+      int rollNumber = 0;
+      while (fs.exists(newFilePath)) {
+        Path oldPath = newFilePath;
+        newFileName = newFileNameWithRollover(rollNumber++);
+        newFilePath = makeNewFilePath(partitionPath, newFileName);
+        LOG.warn("Duplicate write for MERGE and REPLACE handle with path: " + oldPath + ", rolls over to new path: " + newFilePath);
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Checking existing path for merge and replace handle error: " + newFilePath, e);
+    }
+  }
+
+  /**
+   * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
+   */
+  protected String newFileNameWithRollover(int rollNumber) {
+    // make the intermediate file as hidden
+    return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber,
+        this.fileId, hoodieTable.getBaseFileExtension());
+  }
+
+  @Override
+  protected void setWriteStatusPath() {
+    // should still report the old file path.
+    writeStatus.getStat().setPath(new Path(config.getBasePath()), oldFilePath);
+  }
+
+  boolean needsUpdateLocation() {
+    // No need to update location for Flink hoodie records because all the records are pre-tagged
+    // with the desired locations.
+    return false;
+  }
+
+  public void finalizeWrite() {
+    // The file visibility should be kept by the configured ConsistencyGuard instance.
+    try {
+      fs.delete(oldFilePath, false);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error while cleaning the old base file: " + oldFilePath, e);
+    }
+    try {
+      fs.rename(newFilePath, oldFilePath);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error while renaming the temporary roll file: "
+          + newFilePath + " to old base file name: " + oldFilePath, e);
+    }
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    try {
+      List<WriteStatus> writeStatuses = super.close();
+      finalizeWrite();
+      return writeStatuses;
+    } finally {
+      this.isClosed = true;
+    }
+  }
+
+  @Override
+  public void closeGracefully() {
+    if (isClosed) {
+      return;
+    }
+    try {
+      close();
+    } catch (Throwable throwable) {
+      LOG.warn("Error while trying to dispose the MERGE handle", throwable);
+      try {
+        fs.delete(newFilePath, false);
+        LOG.info("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " success!");
+      } catch (IOException e) {
+        // logging a warning and ignore the exception.
+        LOG.warn("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " failed", e);
+      }
+    }
+  }
+
+  @Override
+  public Path getWritePath() {
+    return oldFilePath;
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 518ea69..870a6aa 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -23,12 +23,10 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.MarkerFiles;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -40,7 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers).
+ * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
  *
  * <p>For a new data buffer, it initialize and set up the next file path to write,
  * and closes the file path when the data buffer write finish. When next data buffer
@@ -48,10 +46,7 @@ import java.util.List;
  * for a checkpoint round, it renames the last new file path as the desired file name
  * (name with the expected file ID).
  *
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * @see FlinkMergeAndReplaceHandle
  */
 public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
     extends HoodieMergeHandle<T, I, K, O>
@@ -59,15 +54,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
 
   private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
 
-  /**
-   * Records the current file handles number that rolls over.
-   */
-  private int rollNumber = 0;
-
-  /**
-   * Whether the handle should roll over to new, E.G. the handle has written some intermediate files already.
-   */
-  private volatile boolean shouldRollover = false;
+  private boolean isClosed = false;
 
   /**
    * Records the rolled over file paths.
@@ -130,6 +117,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
     super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
     rolloverPaths = new ArrayList<>();
     try {
+      int rollNumber = 0;
       while (fs.exists(newFilePath)) {
         oldFilePath = newFilePath; // override the old file name
         rolloverPaths.add(oldFilePath);
@@ -151,15 +139,15 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
         this.fileId, hoodieTable.getBaseFileExtension());
   }
 
-  public boolean shouldRollover() {
-    return shouldRollover;
-  }
-
   @Override
   public List<WriteStatus> close() {
-    List<WriteStatus> writeStatus = super.close();
-    this.shouldRollover = true;
-    return writeStatus;
+    try {
+      List<WriteStatus> writeStatus = super.close();
+      finalizeWrite();
+      return writeStatus;
+    } finally {
+      this.isClosed = true;
+    }
   }
 
   boolean needsUpdateLocation() {
@@ -168,53 +156,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
     return false;
   }
 
-  /**
-   *
-   * Rolls over the write handle to prepare for the next batch write.
-   *
-   * <p>It tweaks the handle state as following:
-   *
-   * <ul>
-   *   <li>Increment the {@code rollNumber}</li>
-   *   <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li>
-   *   <li>Make the last new file path as old</li>
-   *   <li>Initialize the new file path and file writer</li>
-   * </ul>
-   *
-   * @param newRecordsItr The records iterator to update
-   */
-  public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
-    init(this.fileId, newRecordsItr);
-    this.recordsWritten = 0;
-    this.recordsDeleted = 0;
-    this.updatedRecordsWritten = 0;
-    this.insertRecordsWritten = 0;
-    this.writeStatus.setTotalErrorRecords(0);
-    this.timer = new HoodieTimer().startTimer();
-
-    rollNumber += 1;
-
-    rolloverPaths.add(newFilePath);
-    oldFilePath = newFilePath;
-    final String newFileName = newFileNameWithRollover(rollNumber);
-    newFilePath = makeNewFilePath(partitionPath, newFileName);
-
-    // create the marker file so that the intermediate roll over files
-    // of the retry task can be cleaned.
-    MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
-    markerFiles.createIfNotExists(partitionPath, newFileName, getIOType());
-
-    try {
-      fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e);
-    }
-
-    LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
-        newFilePath.toString()));
-  }
-
-  public void finishWrite() {
+  public void finalizeWrite() {
     // The file visibility should be kept by the configured ConsistencyGuard instance.
     rolloverPaths.add(newFilePath);
     if (rolloverPaths.size() == 1) {
@@ -241,8 +183,11 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
 
   @Override
   public void closeGracefully() {
+    if (isClosed) {
+      return;
+    }
     try {
-      finishWrite();
+      close();
     } catch (Throwable throwable) {
       LOG.warn("Error while trying to dispose the MERGE handle", throwable);
       try {
@@ -254,4 +199,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
       }
     }
   }
+
+  @Override
+  public Path getWritePath() {
+    return newFilePath;
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 30ac317..5f022d0 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -18,28 +18,36 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hadoop.fs.Path;
+
 /**
  * Hoodie write handle that supports write as mini-batch.
  */
 public interface MiniBatchHandle {
 
   /**
-   * Returns whether the handle should roll over to new,
-   * E.G. the handle has written some intermediate data buffer already.
-   */
-  default boolean shouldRollover() {
-    return false;
-  }
-
-  /**
-   * Finish the write of multiple mini-batches. Usually these mini-bathes
-   * come from one checkpoint interval.
+   * Finalize the write of one mini-batch. Usually these mini-bathes
+   * come from one checkpoint interval. The file handle may roll over to new name
+   * if the name conflicts, give a chance to clean the intermediate file.
    */
-  void finishWrite();
+  default void finalizeWrite() {}
 
   /**
    * Close the file handle gracefully, if any error happens during the file handle close,
    * clean the file to not left corrupted file.
    */
   void closeGracefully();
+
+  /**
+   * Returns the write file path.
+   */
+  Path getWritePath();
+
+  /**
+   * Whether the old write file should be replaced with the same name new file
+   * using content merged with incremental new data batch.
+   */
+  default boolean shouldReplace() {
+    return true;
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index f3e10e8..5cfd28b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -32,11 +32,9 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
 import org.apache.hudi.io.ExplicitWriteHandleFactory;
-import org.apache.hudi.io.FlinkMergeHandle;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
@@ -160,10 +158,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
-    return Collections.emptyMap();
-  }
-
   @Override
   protected boolean isWorkloadProfileNeeded() {
     return true;
@@ -177,26 +171,29 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
       BucketType bucketType,
       Iterator recordItr) {
     try {
-      switch (bucketType) {
-        case INSERT:
-          return handleInsert(fileIdHint, recordItr);
-        case UPDATE:
-          if (this.writeHandle instanceof HoodieCreateHandle) {
-            // During one checkpoint interval, an insert record could also be updated,
-            // for example, for an operation sequence of a record:
-            //    I, U,   | U, U
-            // - batch1 - | - batch2 -
-            // the first batch(batch1) operation triggers an INSERT bucket,
-            // the second batch batch2 tries to reuse the same bucket
-            // and append instead of UPDATE.
+      if (this.writeHandle instanceof HoodieCreateHandle) {
+        // During one checkpoint interval, an insert record could also be updated,
+        // for example, for an operation sequence of a record:
+        //    I, U,   | U, U
+        // - batch1 - | - batch2 -
+        // the first batch(batch1) operation triggers an INSERT bucket,
+        // the second batch batch2 tries to reuse the same bucket
+        // and append instead of UPDATE.
+        return handleInsert(fileIdHint, recordItr);
+      } else if (this.writeHandle instanceof HoodieMergeHandle) {
+        return handleUpdate(partitionPath, fileIdHint, recordItr);
+      } else {
+        switch (bucketType) {
+          case INSERT:
             return handleInsert(fileIdHint, recordItr);
-          }
-          return handleUpdate(partitionPath, fileIdHint, recordItr);
-        default:
-          throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
+          case UPDATE:
+            return handleUpdate(partitionPath, fileIdHint, recordItr);
+          default:
+            throw new AssertionError();
+        }
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
+      String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
@@ -212,7 +209,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
     // these are updates
-    HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr);
+    HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;
     return handleUpdateInternal(upsertHandle, fileId);
   }
 
@@ -234,19 +231,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
     return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
   }
 
-  protected FlinkMergeHandle getUpdateHandle(Iterator<HoodieRecord<T>> recordItr) {
-    if (table.requireSortedRecords()) {
-      throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write");
-    } else {
-      FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle;
-      // add the incremental records.
-      if (writeHandle.shouldRollover()) {
-        writeHandle.rollOver(recordItr);
-      }
-      return writeHandle;
-    }
-  }
-
   @Override
   public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
       throws Exception {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 5e9d8cb..6391750 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.FlinkMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -62,17 +61,14 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
 
   @Override
   public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
-                       HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
-    final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
-    HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
-    HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
-
+                       HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle) throws IOException {
     final GenericDatumWriter<GenericRecord> gWriter;
     final GenericDatumReader<GenericRecord> gReader;
     Schema readSchema;
-    if (isFirstTimeMerge(mergeHandle)
-        && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
+
+    final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
+    HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
+    if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
       readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
       gWriter = new GenericDatumWriter<>(readSchema);
       gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
@@ -83,10 +79,11 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
     }
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
     HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
     try {
       final Iterator<GenericRecord> readerIterator;
-      if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) {
+      if (baseFile.getBootstrapBaseFile().isPresent()) {
         readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
       } else {
         readerIterator = reader.getRecordIterator(readSchema);
@@ -114,9 +111,4 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
       }
     }
   }
-
-  private static boolean isFirstTimeMerge(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) {
-    return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover();
-  }
-
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
index 674a8fd..1fe9820 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
@@ -49,9 +49,6 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordP
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
     FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
-    if (appendHandle.shouldRollover()) {
-      appendHandle.appendNewRecords(recordItr);
-    }
     appendHandle.doAppend();
     List<WriteStatus> writeStatuses = appendHandle.close();
     return Collections.singletonList(writeStatuses).iterator();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index d9cb389..e35419a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -143,22 +143,6 @@ public class StreamWriteFunction<K, I, O>
   private transient TotalSizeTracer tracer;
 
   /**
-   * Flag saying whether the write task is waiting for the checkpoint success notification
-   * after it finished a checkpoint.
-   *
-   * <p>The flag is needed because the write task does not block during the waiting time interval,
-   * some data buckets still flush out with old instant time. There are two cases that the flush may produce
-   * corrupted files if the old instant is committed successfully:
-   * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
-   * 2) the write handle finished the write but was not closed, left an empty parquet file.
-   *
-   * <p>To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms,
-   * the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because
-   * it always filters the data files based on successful commit time.
-   */
-  private volatile boolean confirming = false;
-
-  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
@@ -207,7 +191,7 @@ public class StreamWriteFunction<K, I, O>
 
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
-    this.confirming = false;
+    this.writeClient.cleanHandles();
   }
 
   /**
@@ -416,16 +400,12 @@ public class StreamWriteFunction<K, I, O>
       return;
     }
 
-    // if we are waiting for the checkpoint notification, shift the write instant time.
-    boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant);
-    final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant;
-
     List<HoodieRecord> records = bucket.records;
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
     if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
       records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
     }
-    final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant));
+    final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
     final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
         .taskID(taskID)
         .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
@@ -473,7 +453,5 @@ public class StreamWriteFunction<K, I, O>
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
     this.tracer.reset();
-    this.writeClient.cleanHandles();
-    this.confirming = true;
   }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 7da9801..21fbc5a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -500,9 +500,8 @@ public class TestWriteCopyOnWrite {
 
   Map<String, String> getMiniBatchExpected() {
     Map<String, String> expected = new HashMap<>();
-    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
+    // the last 3 lines are merged
+    expected.put("par1", "["
         + "id1,par1,id1,Danny,23,1,par1, "
         + "id1,par1,id1,Danny,23,1,par1]");
     return expected;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5050109..a4b6c16 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -163,6 +163,7 @@ public class StreamWriteFunctionWrapper<I> {
     functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
     coordinator.notifyCheckpointComplete(checkpointId);
     this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+    this.writeFunction.notifyCheckpointComplete(checkpointId);
     if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
       try {
         compactFunctionWrapper.compact(checkpointId);