You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/06/29 00:54:07 UTC

[hudi] branch master updated: [HUDI-2084] Resend the uncommitted write metadata when start up (#3168)

This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37b7c65  [HUDI-2084] Resend the uncommitted write metadata when start up (#3168)
37b7c65 is described below

commit 37b7c65d8a3ede00ae16909a06e31c24f179998c
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Tue Jun 29 08:53:52 2021 +0800

    [HUDI-2084] Resend the uncommitted write metadata when start up (#3168)
    
    Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  26 +--
 .../java/org/apache/hudi/util/FlinkClientUtil.java |   9 ++
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 155 ++++++++++++++----
 .../org/apache/hudi/sink/StreamWriteOperator.java  |   2 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 174 ++++++++++++++-------
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |   2 +-
 ...teSuccessEvent.java => WriteMetadataEvent.java} |  41 +++--
 .../java/org/apache/hudi/util/CompactionUtil.java  |  11 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  14 +-
 .../org/apache/hudi/sink/StreamWriteITCase.java    |   2 +-
 .../sink/TestStreamWriteOperatorCoordinator.java   |  43 ++++-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  42 ++---
 .../sink/utils/StreamWriteFunctionWrapper.java     |  30 +++-
 13 files changed, 386 insertions(+), 165 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index aa930f7..05e4481 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -58,6 +58,7 @@ import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
+import org.apache.hudi.util.FlinkClientUtil;
 
 import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
@@ -174,7 +175,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   /**
    * Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.
    *
-   * @param records HoodieRecords to insert
+   * @param records     HoodieRecords to insert
    * @param instantTime Instant time of the commit
    * @return list of WriteStatus to inspect errors and counts
    */
@@ -194,7 +195,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   /**
    * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
    *
-   * @param records HoodieRecords to insert
+   * @param records     HoodieRecords to insert
    * @param instantTime Instant time of the commit
    * @return list of WriteStatus to inspect errors and counts
    */
@@ -235,7 +236,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
         getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
     preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
-    HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
+    HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
     return postWrite(result, instantTime, table);
   }
 
@@ -391,11 +392,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   /**
    * Get or create a new write handle in order to reuse the file handles.
    *
-   * @param record        The first record in the bucket
-   * @param config        Write config
-   * @param instantTime   The instant time
-   * @param table         The table
-   * @param recordItr     Record iterator
+   * @param record      The first record in the bucket
+   * @param config      Write config
+   * @param instantTime The instant time
+   * @param table       The table
+   * @param recordItr   Record iterator
    * @return Existing write handle or create a new one
    */
   private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
@@ -454,7 +455,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   public String getLastPendingInstant(String actionType) {
-    HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
+    HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
+        .getCommitsTimeline().filterInflightsAndRequested();
     return unCompletedTimeline.getInstants()
         .filter(x -> x.getAction().equals(actionType))
         .map(HoodieInstant::getTimestamp)
@@ -465,7 +467,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
 
   public String getLastCompletedInstant(HoodieTableType tableType) {
     final String commitType = CommitUtils.getCommitActionType(tableType);
-    HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath)
+        .getCommitsTimeline().filterCompletedInstants();
     return completedTimeline.getInstants()
         .filter(x -> x.getAction().equals(commitType))
         .map(HoodieInstant::getTimestamp)
@@ -475,8 +478,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
-    HoodieFlinkTable<T> table = getHoodieTable();
-    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline();
     HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
     activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
index 4112e2b..65daf78 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +32,13 @@ import java.io.File;
 public class FlinkClientUtil {
 
   /**
+   * Creates the meta client.
+   */
+  public static HoodieTableMetaClient createMetaClient(String basePath) {
+    return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
+  }
+
+  /**
    * Parses the file name from path.
    */
   public static String parseFileName(String path) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 9b1f4d1..8edf02c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -32,11 +32,14 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -54,6 +57,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
@@ -76,20 +80,18 @@ import java.util.stream.Collectors;
  * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
  * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
  *
- * <p>In order to improve the throughput, The function process thread does not block data buffering
- * after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
- * batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
- * (e.g. the eager write batch), the semantics is still correct using the UPSERT operation.
+ * <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until
+ * the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing,
+ * when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata
+ * can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job
+ * recovers.
  *
  * <p><h2>Fault Tolerance</h2>
  *
- * <p>The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
- * The operator rolls back the written data and throws to trigger a failover when any error occurs.
- * This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
- * If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
- * step of the #flushBuffer method).
- *
- * <p>The operator coordinator would try several times when committing the write status.
+ * <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully.
+ * It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint,
+ * the write function blocks data buffer flushing for the configured checkpoint timeout
+ * before it throws exception, any checkpoint failure would finally trigger the job failure.
  *
  * <p>Note: The function task requires the input stream be shuffled by the file IDs.
  *
@@ -163,6 +165,16 @@ public class StreamWriteFunction<K, I, O>
   private volatile boolean confirming = false;
 
   /**
+   * List state of the write metadata events.
+   */
+  private transient ListState<WriteMetadataEvent> writeMetadataState;
+
+  /**
+   * Write status list for the current checkpoint.
+   */
+  private List<WriteStatus> writeStatuses;
+
+  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
@@ -173,27 +185,43 @@ public class StreamWriteFunction<K, I, O>
 
   @Override
   public void open(Configuration parameters) throws IOException {
-    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
-    this.actionType = CommitUtils.getCommitActionType(
-        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
-        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
     this.tracer = new TotalSizeTracer(this.config);
     initBuffer();
     initWriteFunction();
   }
 
   @Override
-  public void initializeState(FunctionInitializationContext context) {
-    // no operation
+  public void initializeState(FunctionInitializationContext context) throws Exception {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
+    this.actionType = CommitUtils.getCommitActionType(
+        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
+
+    this.writeStatuses = new ArrayList<>();
+    this.writeMetadataState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "write-metadata-state",
+            TypeInformation.of(WriteMetadataEvent.class)
+        ));
+
+    if (context.isRestored()) {
+      restoreWriteMetadata();
+    } else {
+      sendBootstrapEvent();
+    }
+    // blocks flushing until the coordinator starts a new instant
+    this.confirming = true;
   }
 
   @Override
-  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
     // Based on the fact that the coordinator starts the checkpoint first,
     // it would check the validity.
     // wait for the buffer data flush out and request a new instant
     flushRemaining(false);
+    // Reload the snapshot state as the current state.
+    reloadWriteMetaState();
   }
 
   @Override
@@ -215,6 +243,7 @@ public class StreamWriteFunction<K, I, O>
   public void endInput() {
     flushRemaining(true);
     this.writeClient.cleanHandles();
+    this.writeStatuses.clear();
   }
 
   // -------------------------------------------------------------------------
@@ -274,6 +303,49 @@ public class StreamWriteFunction<K, I, O>
     }
   }
 
+  private void restoreWriteMetadata() throws Exception {
+    String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
+    boolean eventSent = false;
+    for (WriteMetadataEvent event : this.writeMetadataState.get()) {
+      if (Objects.equals(lastInflight, event.getInstantTime())) {
+        // The checkpoint succeed but the meta does not commit,
+        // re-commit the inflight instant
+        this.eventGateway.sendEventToCoordinator(event);
+        LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
+        eventSent = true;
+      }
+    }
+    if (!eventSent) {
+      sendBootstrapEvent();
+    }
+  }
+
+  private void sendBootstrapEvent() {
+    WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .writeStatus(Collections.emptyList())
+        .instantTime("")
+        .isBootstrap(true)
+        .build();
+    this.eventGateway.sendEventToCoordinator(event);
+    LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
+  }
+
+  /**
+   * Reload the write metadata state as the current checkpoint.
+   */
+  private void reloadWriteMetaState() throws Exception {
+    this.writeMetadataState.clear();
+    WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .instantTime(currentInstant)
+        .writeStatus(new ArrayList<>(writeStatuses))
+        .isBootstrap(true)
+        .build();
+    this.writeMetadataState.add(event);
+    writeStatuses.clear();
+  }
+
   /**
    * Represents a data item in the buffer, this is needed to reduce the
    * memory footprint.
@@ -477,23 +549,23 @@ public class StreamWriteFunction<K, I, O>
     bucket.records.add(item);
   }
 
-  @SuppressWarnings("unchecked, rawtypes")
-  private boolean flushBucket(DataBucket bucket) {
-    String instant = this.writeClient.getLastPendingInstant(this.actionType);
-
-    if (instant == null) {
-      // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, skip.");
-      return false;
-    }
+  private boolean hasData() {
+    return this.buckets.size() > 0
+        && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
+  }
 
+  private String instantToWrite() {
+    String instant = this.writeClient.getLastPendingInstant(this.actionType);
     // if exactly-once semantics turns on,
     // waits for the checkpoint notification until the checkpoint timeout threshold hits.
     if (confirming) {
       long waitingTime = 0L;
       long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
       long interval = 500L;
-      while (instant == null || instant.equals(this.currentInstant)) {
+      // wait condition:
+      // 1. there is no inflight instant
+      // 2. the inflight instant does not change and the checkpoint has buffering data
+      while (instant == null || (instant.equals(this.currentInstant) && hasData())) {
         // sleep for a while
         try {
           if (waitingTime > ckpTimeout) {
@@ -511,6 +583,18 @@ public class StreamWriteFunction<K, I, O>
       // successfully.
       confirming = false;
     }
+    return instant;
+  }
+
+  @SuppressWarnings("unchecked, rawtypes")
+  private boolean flushBucket(DataBucket bucket) {
+    String instant = instantToWrite();
+
+    if (instant == null) {
+      // in case there are empty checkpoints that has no input data
+      LOG.info("No inflight instant when flushing data, skip.");
+      return false;
+    }
 
     List<HoodieRecord> records = bucket.writeBuffer();
     ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
@@ -520,20 +604,22 @@ public class StreamWriteFunction<K, I, O>
     bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
     records.clear();
-    final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
+    final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
         .writeStatus(writeStatus)
         .isLastBatch(false)
         .isEndInput(false)
         .build();
+
     this.eventGateway.sendEventToCoordinator(event);
+    writeStatuses.addAll(writeStatus);
     return true;
   }
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushRemaining(boolean isEndInput) {
-    this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
+    this.currentInstant = instantToWrite();
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
       throw new HoodieException("No inflight instant when flushing data!");
@@ -560,17 +646,20 @@ public class StreamWriteFunction<K, I, O>
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
       writeStatus = Collections.emptyList();
     }
-    final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
+    final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .instantTime(currentInstant)
         .writeStatus(writeStatus)
         .isLastBatch(true)
         .isEndInput(isEndInput)
         .build();
+
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
     this.tracer.reset();
     this.writeClient.cleanHandles();
+    this.writeStatuses.addAll(writeStatus);
+    // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
index 3150d06..b0f8328 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
@@ -51,7 +51,7 @@ public class StreamWriteOperator<I>
   }
 
   @Override
-  public void endInput() throws Exception {
+  public void endInput() {
     sinkFunction.endInput();
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 7f6f816..84f3c0b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.utils.CoordinatorExecutor;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
@@ -97,7 +97,7 @@ public class StreamWriteOperatorCoordinator
    * Event buffer for one round of checkpointing. When all the elements are non-null and have the same
    * write instant, then the instant succeed and we can commit it.
    */
-  private transient BatchWriteSuccessEvent[] eventBuffer;
+  private transient WriteMetadataEvent[] eventBuffer;
 
   /**
    * Task number of the operator.
@@ -152,8 +152,6 @@ public class StreamWriteOperatorCoordinator
     this.tableState = TableState.create(conf);
     // init table, create it if not exists.
     initTableIfNotExists(this.conf);
-    // start a new instant
-    startInstant();
     // start the executor
     this.executor = new CoordinatorExecutor(this.context, LOG);
     // start the executor if required
@@ -201,7 +199,7 @@ public class StreamWriteOperatorCoordinator
           // for streaming mode, commits the ever received events anyway,
           // the stream write task snapshot and flush the data buffer synchronously in sequence,
           // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
-          final boolean committed = commitInstant();
+          final boolean committed = commitInstant(this.instant);
           if (committed) {
             // if async compaction is on, schedule the compaction
             if (asyncCompaction) {
@@ -216,30 +214,8 @@ public class StreamWriteOperatorCoordinator
     );
   }
 
-  private void syncHiveIfEnabled() {
-    if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
-      this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
-    }
-  }
-
-  /**
-   * Sync hoodie table metadata to Hive metastore.
-   */
-  public void syncHive() {
-    hiveSyncContext.hiveSyncTool().syncHoodieTable();
-  }
-
-  private void startInstant() {
-    final String instant = HoodieActiveTimeline.createNewInstantTime();
-    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
-    this.instant = instant;
-    this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
-    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
-            this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
-  }
-
   @Override
-  public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) {
+  public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
     // no operation
   }
 
@@ -248,27 +224,17 @@ public class StreamWriteOperatorCoordinator
     executor.execute(
         () -> {
           // no event to handle
-          ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
-              "The coordinator can only handle BatchWriteSuccessEvent");
-          BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
-          // the write task does not block after checkpointing(and before it receives a checkpoint success event),
-          // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
-          // success event, the data buffer would flush with an older instant time.
-          ValidationUtils.checkState(
-              HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
-              String.format("Receive an unexpected event for instant %s from task %d",
-                  event.getInstantTime(), event.getTaskID()));
-          if (this.eventBuffer[event.getTaskID()] != null) {
-            this.eventBuffer[event.getTaskID()].mergeWith(event);
+          ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
+              "The coordinator can only handle WriteMetaEvent");
+          WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
+          if (event.isBootstrap()) {
+            handleBootstrapEvent(event);
+          } else if (event.isEndInput()) {
+            handleEndInputEvent(event);
           } else {
-            this.eventBuffer[event.getTaskID()] = event;
+            handleWriteMetaEvent(event);
           }
-          if (event.isEndInput() && allEventsReceived()) {
-            // start to commit the instant.
-            commitInstant();
-            // no compaction scheduling for batch mode
-          }
-        }, "handle write success event for instant %s", this.instant
+        }, "handle write metadata event for instant %s", this.instant
     );
   }
 
@@ -291,22 +257,108 @@ public class StreamWriteOperatorCoordinator
     this.hiveSyncContext = HiveSyncContext.create(conf);
   }
 
+  private void syncHiveIfEnabled() {
+    if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
+      this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
+    }
+  }
+
+  /**
+   * Sync hoodie table metadata to Hive metastore.
+   */
+  public void syncHive() {
+    hiveSyncContext.hiveSyncTool().syncHoodieTable();
+  }
+
   private void reset() {
-    this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
+    this.eventBuffer = new WriteMetadataEvent[this.parallelism];
   }
 
-  /** Checks the buffer is ready to commit. */
+  /**
+   * Checks the buffer is ready to commit.
+   */
   private boolean allEventsReceived() {
     return Arrays.stream(eventBuffer)
         .allMatch(event -> event != null && event.isReady(this.instant));
   }
 
+  private void addEventToBuffer(WriteMetadataEvent event) {
+    if (this.eventBuffer[event.getTaskID()] != null) {
+      this.eventBuffer[event.getTaskID()].mergeWith(event);
+    } else {
+      this.eventBuffer[event.getTaskID()] = event;
+    }
+  }
+
+  private void startInstant() {
+    final String instant = HoodieActiveTimeline.createNewInstantTime();
+    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    this.instant = instant;
+    this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
+        this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
+  }
+
+  /**
+   * Initializes the instant.
+   *
+   * <p>Recommits the last inflight instant if the write metadata checkpoint successfully
+   * but was not committed due to some rare cases.
+   *
+   * <p>Starts a new instant, a writer can not flush data buffer
+   * until it finds a new inflight instant on the timeline.
+   */
+  private void initInstant(String instant) {
+    HoodieTimeline completedTimeline =
+        StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
+    executor.execute(() -> {
+      if (instant.equals("") || completedTimeline.containsInstant(instant)) {
+        // the last instant committed successfully
+        reset();
+      } else {
+        LOG.info("Recommit instant {}", instant);
+        commitInstant(instant);
+      }
+      // starts a new instant
+      startInstant();
+    }, "initialize instant %s", instant);
+  }
+
+  private void handleBootstrapEvent(WriteMetadataEvent event) {
+    addEventToBuffer(event);
+    if (Arrays.stream(eventBuffer).allMatch(Objects::nonNull)) {
+      // start to initialize the instant.
+      initInstant(event.getInstantTime());
+    }
+  }
+
+  private void handleEndInputEvent(WriteMetadataEvent event) {
+    addEventToBuffer(event);
+    if (allEventsReceived()) {
+      // start to commit the instant.
+      commitInstant(this.instant);
+      // no compaction scheduling for batch mode
+    }
+  }
+
+  private void handleWriteMetaEvent(WriteMetadataEvent event) {
+    // the write task does not block after checkpointing(and before it receives a checkpoint success event),
+    // if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
+    // success event, the data buffer would flush with an older instant time.
+    ValidationUtils.checkState(
+        HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
+        String.format("Receive an unexpected event for instant %s from task %d",
+            event.getInstantTime(), event.getTaskID()));
+
+    addEventToBuffer(event);
+  }
+
   /**
    * Commits the instant.
    *
    * @return true if the write statuses are committed successfully.
    */
-  private boolean commitInstant() {
+  private boolean commitInstant(String instant) {
     if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
       // The last checkpoint finished successfully.
       return false;
@@ -314,7 +366,7 @@ public class StreamWriteOperatorCoordinator
 
     List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
         .filter(Objects::nonNull)
-        .map(BatchWriteSuccessEvent::getWriteStatuses)
+        .map(WriteMetadataEvent::getWriteStatuses)
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
@@ -323,13 +375,15 @@ public class StreamWriteOperatorCoordinator
       reset();
       return false;
     }
-    doCommit(writeResults);
+    doCommit(instant, writeResults);
     return true;
   }
 
-  /** Performs the actual commit action. */
+  /**
+   * Performs the actual commit action.
+   */
   @SuppressWarnings("unchecked")
-  private void doCommit(List<WriteStatus> writeResults) {
+  private void doCommit(String instant, List<WriteStatus> writeResults) {
     // commit or rollback
     long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
     long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
@@ -345,13 +399,13 @@ public class StreamWriteOperatorCoordinator
       final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
           ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
           : Collections.emptyMap();
-      boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata),
+      boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),
           tableState.commitAction, partitionToReplacedFileIds);
       if (success) {
         reset();
-        LOG.info("Commit instant [{}] success!", this.instant);
+        LOG.info("Commit instant [{}] success!", instant);
       } else {
-        throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant));
+        throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
       }
     } else {
       LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
@@ -364,13 +418,13 @@ public class StreamWriteOperatorCoordinator
         }
       });
       // Rolls back instant
-      writeClient.rollback(this.instant);
-      throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant));
+      writeClient.rollback(instant);
+      throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));
     }
   }
 
   @VisibleForTesting
-  public BatchWriteSuccessEvent[] getEventBuffer() {
+  public WriteMetadataEvent[] getEventBuffer() {
     return eventBuffer;
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index f95f3e3..289d4f6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -58,7 +58,7 @@ public class HoodieFlinkCompactor {
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
 
     // create metaClient
-    HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
 
     // get the table name
     conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
similarity index 81%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
index 186a470..662383b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
@@ -30,13 +30,14 @@ import java.util.Objects;
 /**
  * An operator event to mark successful checkpoint batch write.
  */
-public class BatchWriteSuccessEvent implements OperatorEvent {
+public class WriteMetadataEvent implements OperatorEvent {
   private static final long serialVersionUID = 1L;
 
   private List<WriteStatus> writeStatuses;
   private final int taskID;
   private String instantTime;
   private boolean isLastBatch;
+
   /**
    * Flag saying whether the event comes from the end of input, e.g. the source
    * is bounded, there are two cases in which this flag should be set to true:
@@ -46,6 +47,11 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
   private final boolean isEndInput;
 
   /**
+   * Flag saying whether the event comes from bootstrap of a write function.
+   */
+  private final boolean isBootstrap;
+
+  /**
    * Creates an event.
    *
    * @param taskID        The task ID
@@ -55,22 +61,25 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
    *                      within an checkpoint interval,
    *                      if true, the whole data set of the checkpoint
    *                      has been flushed successfully
+   * @param isBootstrap   Whether the event comes from the bootstrap
    */
-  private BatchWriteSuccessEvent(
+  private WriteMetadataEvent(
       int taskID,
       String instantTime,
       List<WriteStatus> writeStatuses,
       boolean isLastBatch,
-      boolean isEndInput) {
+      boolean isEndInput,
+      boolean isBootstrap) {
     this.taskID = taskID;
     this.instantTime = instantTime;
     this.writeStatuses = new ArrayList<>(writeStatuses);
     this.isLastBatch = isLastBatch;
     this.isEndInput = isEndInput;
+    this.isBootstrap = isBootstrap;
   }
 
   /**
-   * Returns the builder for {@link BatchWriteSuccessEvent}.
+   * Returns the builder for {@link WriteMetadataEvent}.
    */
   public static Builder builder() {
     return new Builder();
@@ -96,12 +105,16 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
     return isEndInput;
   }
 
+  public boolean isBootstrap() {
+    return isBootstrap;
+  }
+
   /**
-   * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}.
+   * Merges this event with given {@link WriteMetadataEvent} {@code other}.
    *
    * @param other The event to be merged
    */
-  public void mergeWith(BatchWriteSuccessEvent other) {
+  public void mergeWith(WriteMetadataEvent other) {
     ValidationUtils.checkArgument(this.taskID == other.taskID);
     // the instant time could be monotonically increasing
     this.instantTime = other.instantTime;
@@ -112,7 +125,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
     this.writeStatuses = statusList;
   }
 
-  /** Returns whether the event is ready to commit. */
+  /**
+   * Returns whether the event is ready to commit.
+   */
   public boolean isReady(String currentInstant) {
     return isLastBatch && this.instantTime.equals(currentInstant);
   }
@@ -122,7 +137,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
   // -------------------------------------------------------------------------
 
   /**
-   * Builder for {@link BatchWriteSuccessEvent}.
+   * Builder for {@link WriteMetadataEvent}.
    */
   public static class Builder {
     private List<WriteStatus> writeStatus;
@@ -130,12 +145,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
     private String instantTime;
     private boolean isLastBatch = false;
     private boolean isEndInput = false;
+    private boolean isBootstrap = false;
 
-    public BatchWriteSuccessEvent build() {
+    public WriteMetadataEvent build() {
       Objects.requireNonNull(taskID);
       Objects.requireNonNull(instantTime);
       Objects.requireNonNull(writeStatus);
-      return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput);
+      return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap);
     }
 
     public Builder taskID(int taskID) {
@@ -162,5 +178,10 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
       this.isEndInput = isEndInput;
       return this;
     }
+
+    public Builder isBootstrap(boolean isBootstrap) {
+      this.isBootstrap = isBootstrap;
+      return this;
+    }
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 3d98ce9..e8927dc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -26,11 +26,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.table.HoodieFlinkTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,13 +44,6 @@ public class CompactionUtil {
   private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class);
 
   /**
-   * Creates the metaClient.
-   */
-  public static HoodieTableMetaClient createMetaClient(Configuration conf) {
-    return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
-  }
-
-  /**
    * Gets compaction Instant time.
    */
   public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) {
@@ -72,7 +65,7 @@ public class CompactionUtil {
    * Sets up the avro schema string into the give configuration {@code conf}
    * through reading from the hoodie table metadata.
    *
-   * @param conf    The configuration
+   * @param conf The configuration
    */
   public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e20f34f..d73b300 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -154,7 +154,7 @@ public class StreamerUtil {
                     .withMaxMemoryMaxSize(
                         conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L,
                         conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
-                        ).build())
+                    ).build())
             .forTable(conf.getString(FlinkOptions.TABLE_NAME))
             .withStorageConfig(HoodieStorageConfig.newBuilder()
                 .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
@@ -221,13 +221,16 @@ public class StreamerUtil {
     // some of the filesystems release the handles in #close method.
   }
 
-  /** Generates the bucket ID using format {partition path}_{fileID}. */
+  /**
+   * Generates the bucket ID using format {partition path}_{fileID}.
+   */
   public static String generateBucketKey(String partitionPath, String fileId) {
     return String.format("%s_%s", partitionPath, fileId);
   }
 
   /**
    * Returns whether needs to schedule the async compaction.
+   *
    * @param conf The flink configuration.
    */
   public static boolean needsAsyncCompaction(Configuration conf) {
@@ -238,6 +241,13 @@ public class StreamerUtil {
   }
 
   /**
+   * Creates the meta client.
+   */
+  public static HoodieTableMetaClient createMetaClient(Configuration conf) {
+    return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
+  }
+
+  /**
    * Creates the Flink write client.
    */
   public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 8e67159..be4c052 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -200,7 +200,7 @@ public class StreamWriteITCase extends TestLogger {
     conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
 
     // create metaClient
-    HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
 
     // set the table name
     conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index a2fdf22..612bb79 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -70,6 +70,23 @@ public class TestStreamWriteOperatorCoordinator {
         TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context);
     coordinator.start();
     coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+    final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
+        .taskID(0)
+        .instantTime("")
+        .writeStatus(Collections.emptyList())
+        .isBootstrap(true)
+        .build();
+
+    final WriteMetadataEvent event1 = WriteMetadataEvent.builder()
+        .taskID(1)
+        .instantTime("")
+        .writeStatus(Collections.emptyList())
+        .isBootstrap(true)
+        .build();
+
+    coordinator.handleEventFromOperator(0, event0);
+    coordinator.handleEventFromOperator(1, event1);
   }
 
   @AfterEach
@@ -85,7 +102,7 @@ public class TestStreamWriteOperatorCoordinator {
     WriteStatus writeStatus = new WriteStatus(true, 0.1D);
     writeStatus.setPartitionPath("par1");
     writeStatus.setStat(new HoodieWriteStat());
-    OperatorEvent event0 = BatchWriteSuccessEvent.builder()
+    OperatorEvent event0 = WriteMetadataEvent.builder()
         .taskID(0)
         .instantTime(instant)
         .writeStatus(Collections.singletonList(writeStatus))
@@ -95,7 +112,7 @@ public class TestStreamWriteOperatorCoordinator {
     WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
     writeStatus1.setPartitionPath("par2");
     writeStatus1.setStat(new HoodieWriteStat());
-    OperatorEvent event1 = BatchWriteSuccessEvent.builder()
+    OperatorEvent event1 = WriteMetadataEvent.builder()
         .taskID(1)
         .instantTime(instant)
         .writeStatus(Collections.singletonList(writeStatus1))
@@ -132,7 +149,7 @@ public class TestStreamWriteOperatorCoordinator {
   public void testReceiveInvalidEvent() {
     CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);
-    OperatorEvent event = BatchWriteSuccessEvent.builder()
+    OperatorEvent event = WriteMetadataEvent.builder()
         .taskID(0)
         .instantTime("abc")
         .writeStatus(Collections.emptyList())
@@ -147,7 +164,7 @@ public class TestStreamWriteOperatorCoordinator {
     final CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);
     String instant = coordinator.getInstant();
-    OperatorEvent event = BatchWriteSuccessEvent.builder()
+    OperatorEvent event = WriteMetadataEvent.builder()
         .taskID(0)
         .instantTime(instant)
         .writeStatus(Collections.emptyList())
@@ -163,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator {
     WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
     writeStatus1.setPartitionPath("par2");
     writeStatus1.setStat(new HoodieWriteStat());
-    OperatorEvent event1 = BatchWriteSuccessEvent.builder()
+    OperatorEvent event1 = WriteMetadataEvent.builder()
         .taskID(1)
         .instantTime(instant)
         .writeStatus(Collections.singletonList(writeStatus1))
@@ -186,20 +203,30 @@ public class TestStreamWriteOperatorCoordinator {
     coordinator.start();
     coordinator.setExecutor(new MockCoordinatorExecutor(context));
 
+    final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
+        .taskID(0)
+        .instantTime("")
+        .writeStatus(Collections.emptyList())
+        .isBootstrap(true)
+        .build();
+
+    coordinator.handleEventFromOperator(0, event0);
+
     String instant = coordinator.getInstant();
     assertNotEquals("", instant);
 
     WriteStatus writeStatus = new WriteStatus(true, 0.1D);
     writeStatus.setPartitionPath("par1");
     writeStatus.setStat(new HoodieWriteStat());
-    OperatorEvent event0 = BatchWriteSuccessEvent.builder()
+
+    OperatorEvent event1 = WriteMetadataEvent.builder()
         .taskID(0)
         .instantTime(instant)
         .writeStatus(Collections.singletonList(writeStatus))
         .isLastBatch(true)
         .build();
 
-    coordinator.handleEventFromOperator(0, event0);
+    coordinator.handleEventFromOperator(0, event1);
 
     // never throw for hive synchronization now
     assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index e1cb99e..90a3b34 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -135,8 +135,8 @@ public class TestWriteCopyOnWrite {
     String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
-    List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+    MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+    List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
     assertNotNull(writeStatuses);
     MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
     assertThat(writeStatuses.stream()
@@ -162,8 +162,8 @@ public class TestWriteCopyOnWrite {
     assertNotEquals(instant, instant2);
 
     final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class));
-    List<WriteStatus> writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses();
+    assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class));
+    List<WriteStatus> writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses();
     assertNotNull(writeStatuses2);
     assertThat(writeStatuses2.size(), is(0)); // write empty statuses
 
@@ -191,8 +191,8 @@ public class TestWriteCopyOnWrite {
     assertNotNull(instant);
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
-    List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
+    List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
     assertNotNull(writeStatuses);
     assertThat(writeStatuses.size(), is(0)); // no data write
 
@@ -210,7 +210,9 @@ public class TestWriteCopyOnWrite {
     }
 
     // this returns early because there is no inflight instant
-    funcWrapper.checkpointFunction(2);
+    assertThrows(HoodieException.class,
+        () -> funcWrapper.checkpointFunction(2),
+        "Timeout(0ms) while waiting for");
     // do not sent the write event and fails the checkpoint,
     // behaves like the last checkpoint is successful.
     funcWrapper.checkpointFails(2);
@@ -232,7 +234,7 @@ public class TestWriteCopyOnWrite {
         .getLastPendingInstant(getTableType());
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -262,7 +264,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -298,7 +300,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -318,7 +320,7 @@ public class TestWriteCopyOnWrite {
         .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -343,7 +345,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -363,7 +365,7 @@ public class TestWriteCopyOnWrite {
         .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -408,7 +410,7 @@ public class TestWriteCopyOnWrite {
 
     final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
     final OperatorEvent event2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
     funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
@@ -470,7 +472,7 @@ public class TestWriteCopyOnWrite {
 
     final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
     final OperatorEvent event2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
     funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
@@ -534,7 +536,7 @@ public class TestWriteCopyOnWrite {
 
     for (int i = 0; i < 2; i++) {
       final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class));
+      assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
       funcWrapper.getCoordinator().handleEventFromOperator(0, event);
     }
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -592,7 +594,7 @@ public class TestWriteCopyOnWrite {
     funcWrapper.checkpointFunction(1);
 
     OperatorEvent nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -634,7 +636,7 @@ public class TestWriteCopyOnWrite {
         .getLastPendingInstant(getTableType());
 
     nextEvent = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
     checkWrittenData(tempFile, EXPECTED2);
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
@@ -673,7 +675,7 @@ public class TestWriteCopyOnWrite {
 
     for (int i = 0; i < 2; i++) {
       final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
-      assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class));
+      assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
       funcWrapper.getCoordinator().handleEventFromOperator(0, event);
     }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 84ba9da..e78456b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -27,7 +27,7 @@ import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bootstrap.BootstrapFunction;
 import org.apache.hudi.sink.bootstrap.IndexRecord;
-import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -70,15 +70,25 @@ public class StreamWriteFunctionWrapper<I> {
   private final StreamWriteOperatorCoordinator coordinator;
   private final MockFunctionInitializationContext functionInitializationContext;
 
-  /** Function that converts row data to HoodieRecord. */
+  /**
+   * Function that converts row data to HoodieRecord.
+   */
   private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
-  /** Function that load index in state. */
+  /**
+   * Function that load index in state.
+   */
   private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
-  /** Function that assigns bucket ID. */
+  /**
+   * Function that assigns bucket ID.
+   */
   private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
-  /** BucketAssignOperator context. **/
+  /**
+   * BucketAssignOperator context.
+   **/
   private MockBucketAssignOperatorContext bucketAssignOperatorContext;
-  /** Stream write function. */
+  /**
+   * Stream write function.
+   */
   private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
 
   private CompactFunctionWrapper compactFunctionWrapper;
@@ -133,8 +143,12 @@ public class StreamWriteFunctionWrapper<I> {
     writeFunction = new StreamWriteFunction<>(conf);
     writeFunction.setRuntimeContext(runtimeContext);
     writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.initializeState(this.functionInitializationContext);
     writeFunction.open(conf);
 
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
+
     if (asyncCompaction) {
       compactFunctionWrapper.openFunction();
     }
@@ -184,7 +198,7 @@ public class StreamWriteFunctionWrapper<I> {
     writeFunction.processElement(hoodieRecords[0], null, null);
   }
 
-  public BatchWriteSuccessEvent[] getEventBuffer() {
+  public WriteMetadataEvent[] getEventBuffer() {
     return this.coordinator.getEventBuffer();
   }
 
@@ -201,7 +215,7 @@ public class StreamWriteFunctionWrapper<I> {
     return this.writeFunction.getWriteClient();
   }
 
-  public void checkpointFunction(long checkpointId) {
+  public void checkpointFunction(long checkpointId) throws Exception {
     // checkpoint the coordinator first
     this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
     bucketAssignerFunction.snapshotState(null);