You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/06/08 05:55:47 UTC

[hudi] branch master updated: add BootstrapFunction to support index bootstrap (#3024)

This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf83f10  add BootstrapFunction to support index bootstrap (#3024)
cf83f10 is described below

commit cf83f10f5b369e1f16e63b4b68750d3ad2dc0240
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Tue Jun 8 13:55:25 2021 +0800

    add BootstrapFunction to support index bootstrap (#3024)
    
    Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
 .../apache/hudi/configuration/FlinkOptions.java    |   6 +
 .../hudi/sink/bootstrap/BootstrapFunction.java     | 235 +++++++++++++++++++++
 .../hudi/sink/bootstrap/BootstrapRecord.java       |  33 +++
 .../bootstrap/aggregate/BootstrapAccumulator.java  |  53 +++++
 .../sink/bootstrap/aggregate/BootstrapAggFunc.java |  49 +++++
 .../sink/partitioner/BucketAssignFunction.java     | 102 ++-------
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  21 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |  21 +-
 .../org/apache/hudi/sink/StreamWriteITCase.java    |  28 ++-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  28 ++-
 .../sink/utils/StreamWriteFunctionWrapper.java     |  44 ++++
 11 files changed, 507 insertions(+), 113 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7466b41..57fd501 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -93,6 +93,12 @@ public class FlinkOptions {
       .withDescription("Whether to update index for the old partition path\n"
           + "if same key record with different partition path came in, default false");
 
+  public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
+      .key("index.partition.regex")
+      .stringType()
+      .defaultValue(".*")
+      .withDescription("Whether to load partitions in state if partition path matching, default *");
+
   // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
new file mode 100644
index 0000000..5f81559
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/**
+ * The function to load index from exists hoodieTable.
+ *
+ * <p>Each subtask in bootstrapFunction triggers the bootstrap index with the first element,
+ * Received record cannot be sent until the index is all sent.
+ *
+ * <p>The output records should then shuffle by the recordKey and thus do scalable write.
+ *
+ * @see BootstrapFunction
+ */
+public class BootstrapFunction<I, O extends HoodieRecord>
+      extends ProcessFunction<I, O>
+      implements CheckpointedFunction, CheckpointListener {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
+
+  private HoodieTable<?, ?, ?, ?> hoodieTable;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private GlobalAggregateManager aggregateManager;
+  private ListState<Boolean> bootstrapState;
+
+  private final Pattern pattern;
+  private boolean alreadyBootstrap;
+
+  public BootstrapFunction(Configuration conf) {
+    this.conf = conf;
+    this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws Exception {
+    this.bootstrapState = context.getOperatorStateStore().getListState(
+       new ListStateDescriptor<>(
+           "bootstrap-state",
+           TypeInformation.of(new TypeHint<Boolean>() {})
+       )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+      for (Boolean alreadyBootstrap : bootstrapState.get()) {
+        this.alreadyBootstrap = alreadyBootstrap;
+      }
+    }
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.hoodieTable = getTable();
+    this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
+    if (!alreadyBootstrap) {
+      LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask());
+      String basePath = hoodieTable.getMetaClient().getBasePath();
+      for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
+        if (pattern.matcher(partitionPath).matches()) {
+          loadRecords(partitionPath, out);
+        }
+      }
+
+      // wait for others bootstrap task send bootstrap complete.
+      updateAndWaiting();
+
+      alreadyBootstrap = true;
+      LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+    }
+
+    // send data to next operator
+    out.collect((O) value);
+  }
+
+  /**
+   * Wait for other bootstrap task send bootstrap complete.
+   */
+  private void updateAndWaiting() {
+    int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
+    int readyTaskNum = 1;
+    while (taskNum != readyTaskNum) {
+      try {
+        readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc());
+        LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+
+        TimeUnit.SECONDS.sleep(5);
+      } catch (Exception e) {
+        LOG.warn("update global aggregate error", e);
+      }
+    }
+  }
+
+  private HoodieFlinkTable getTable() {
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(this.hadoopConf),
+        new FlinkTaskContextSupplier(getRuntimeContext()));
+    return HoodieFlinkTable.create(writeConfig, context);
+  }
+
+  /**
+   * Load all the indices of give partition path into the backup state.
+   *
+   * @param partitionPath The partition path
+   */
+  @SuppressWarnings("unchecked")
+  private void loadRecords(String partitionPath, Collector<O> out) {
+    long start = System.currentTimeMillis();
+    BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
+    List<HoodieBaseFile> latestBaseFiles =
+            HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
+    LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
+
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    for (HoodieBaseFile baseFile : latestBaseFiles) {
+      boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
+          baseFile.getFileId(), maxParallelism, parallelism) == taskID;
+
+      if (shouldLoad) {
+        LOG.info("Load records from file {}.", baseFile);
+        final List<HoodieKey> hoodieKeys;
+        try {
+          hoodieKeys =
+                    fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
+        } catch (Exception e) {
+          throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
+        }
+
+        for (HoodieKey hoodieKey : hoodieKeys) {
+          out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile)));
+        }
+      }
+    }
+
+    long cost = System.currentTimeMillis() - start;
+    LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
+            this.getClass().getSimpleName(), taskID, partitionPath, cost);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) {
+    HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
+    hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId()));
+    hoodieRecord.seal();
+
+    return hoodieRecord;
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    this.bootstrapState.add(alreadyBootstrap);
+  }
+
+  @VisibleForTesting
+  public boolean isAlreadyBootstrap() {
+    return alreadyBootstrap;
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
new file mode 100644
index 0000000..025d844
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+/**
+ * An record to mark HoodieRecord or IndexRecord.
+ */
+public class BootstrapRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
+  private static final long serialVersionUID = 1L;
+
+  public BootstrapRecord(HoodieRecord<T> record) {
+    super(record);
+  }
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
new file mode 100644
index 0000000..80067f0
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Aggregate accumulator.
+ */
+public class BootstrapAccumulator implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final Set<Integer> readyTaskSet;
+
+  public BootstrapAccumulator() {
+    this.readyTaskSet = new HashSet<>();
+  }
+
+  public void update(int taskId) {
+    readyTaskSet.add(taskId);
+  }
+
+  public int readyTaskNum() {
+    return readyTaskSet.size();
+  }
+
+  public BootstrapAccumulator merge(BootstrapAccumulator acc) {
+    if (acc == null) {
+      return this;
+    }
+
+    readyTaskSet.addAll(acc.readyTaskSet);
+    return this;
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
new file mode 100644
index 0000000..2233e84
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+/**
+ * Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
+ */
+public class BootstrapAggFunc implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
+  public static final String NAME = BootstrapAggFunc.class.getSimpleName();
+
+  @Override
+  public BootstrapAccumulator createAccumulator() {
+    return new BootstrapAccumulator();
+  }
+
+  @Override
+  public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
+    bootstrapAccumulator.update(taskId);
+    return bootstrapAccumulator;
+  }
+
+  @Override
+  public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
+    return bootstrapAccumulator.readyTaskNum();
+  }
+
+  @Override
+  public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
+    return bootstrapAccumulator.merge(acc);
+  }
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 36014a8..00a8274 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -22,43 +22,32 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.BaseAvroPayload;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
 import org.apache.hudi.sink.utils.PayloadCreation;
-import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
 import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -80,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
     extends KeyedProcessFunction<K, I, O>
     implements CheckpointedFunction, CheckpointListener {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
-
   private BucketAssignOperator.Context context;
 
   /**
@@ -108,13 +95,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
 
   private final boolean isChangingRecords;
 
-  private final boolean bootstrapIndex;
-
-  /**
-   * State to book-keep which partition is loaded into the index state {@code indexState}.
-   */
-  private MapState<String, Integer> partitionLoadState;
-
   /**
    * Used to create DELETE payload.
    */
@@ -130,7 +110,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
     this.conf = conf;
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
-    this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
     this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
   }
 
@@ -168,31 +147,29 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
       indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
     }
     indexState = context.getKeyedStateStore().getState(indexStateDesc);
-    if (bootstrapIndex) {
-      MapStateDescriptor<String, Integer> partitionLoadStateDesc =
-          new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
-      partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
-    }
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
+    if (value instanceof BootstrapRecord) {
+      BootstrapRecord bootstrapRecord = (BootstrapRecord) value;
+      this.context.setCurrentKey(bootstrapRecord.getRecordKey());
+      this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation());
+    } else {
+      processRecord((HoodieRecord<?>) value, out);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
     // 1. put the record into the BucketAssigner;
     // 2. look up the state for location, if the record has a location, just send it out;
     // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
-    HoodieRecord<?> record = (HoodieRecord<?>) value;
     final HoodieKey hoodieKey = record.getKey();
     final String recordKey = hoodieKey.getRecordKey();
     final String partitionPath = hoodieKey.getPartitionPath();
     final HoodieRecordLocation location;
 
-    // The dataset may be huge, thus the processing would block for long,
-    // disabled by default.
-    if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
-      // If the partition records are never loaded, load the records first.
-      loadRecords(partitionPath, recordKey);
-    }
     // Only changing records need looking up the index for the location,
     // append only records are always recognized as INSERT.
     HoodieRecordGlobalLocation oldLoc = indexState.value();
@@ -216,6 +193,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
       }
     } else {
       location = getNewRecordLocation(partitionPath);
+      this.context.setCurrentKey(recordKey);
       if (isChangingRecords) {
         updateIndexState(partitionPath, location);
       }
@@ -264,62 +242,8 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
     this.context = context;
   }
 
-  /**
-   * Load all the indices of give partition path into the backup state.
-   *
-   * @param partitionPath The partition path
-   * @param curRecordKey  The current record key
-   * @throws Exception when error occurs for state update
-   */
-  private void loadRecords(String partitionPath, String curRecordKey) throws Exception {
-    LOG.info("Start loading records under partition {} into the index state", partitionPath);
-    HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
-    List<HoodieBaseFile> latestBaseFiles =
-        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
-    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
-    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
-    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
-    for (HoodieBaseFile baseFile : latestBaseFiles) {
-      final List<HoodieKey> hoodieKeys;
-      try {
-        hoodieKeys =
-            fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath()));
-      } catch (Exception e) {
-        // in case there was some empty parquet file when the pipeline
-        // crushes exceptionally.
-        LOG.error("Error when loading record keys from file: {}", baseFile);
-        continue;
-      }
-      hoodieKeys.forEach(hoodieKey -> {
-        try {
-          // Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
-          // the input records is shuffled by record key
-          boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
-              hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
-          if (shouldLoad) {
-            this.context.setCurrentKey(hoodieKey.getRecordKey());
-            this.indexState.update(
-                new HoodieRecordGlobalLocation(
-                    hoodieKey.getPartitionPath(),
-                    baseFile.getCommitTime(),
-                    baseFile.getFileId()));
-          }
-        } catch (Exception e) {
-          LOG.error("Error when putting record keys into the state from file: {}", baseFile);
-        }
-      });
-    }
-    // recover the currentKey
-    this.context.setCurrentKey(curRecordKey);
-    // Mark the partition path as loaded.
-    partitionLoadState.put(partitionPath, 0);
-    LOG.info("Finish loading records under partition {} into the index state", partitionPath);
-  }
-
   @VisibleForTesting
   public void clearIndexState() {
-    this.partitionLoadState.clear();
     this.indexState.clear();
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b0f7ada..9e77e73 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -22,11 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
-import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactFunction;
-import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -50,6 +52,8 @@ import java.util.Properties;
 /**
  * An Utility which can incrementally consume data from Kafka and apply it to the target table.
  * currently, it only support COW table and insert, upsert operation.
+ *
+ * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
  */
 public class HoodieFlinkStreamer {
   public static void main(String[] args) throws Exception {
@@ -82,7 +86,7 @@ public class HoodieFlinkStreamer {
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
         new StreamWriteOperatorFactory<>(conf);
 
-    DataStream<Object> pipeline = env.addSource(new FlinkKafkaConsumer<>(
+    DataStream<HoodieRecord> hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
         new JsonRowDataDeserializationSchema(
             rowType,
@@ -93,8 +97,15 @@ public class HoodieFlinkStreamer {
         ), kafkaProps))
         .name("kafka_source")
         .uid("uid_kafka_source")
-        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
-        // Key-by record key, to avoid multiple subtasks write to a partition at the same time
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+              TypeInformation.of(HoodieRecord.class),
+              new ProcessOperator<>(new BootstrapFunction<>(conf)));
+    }
+
+    DataStream<Object> pipeline = hoodieDataStream
+        // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
         .keyBy(HoodieRecord::getRecordKey)
         .transform(
             "bucket_assigner",
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 02e76aa..0d8f7f2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
 import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -38,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -74,10 +77,22 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
       StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
 
-      DataStream<Object> pipeline = dataStream
-          .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+      DataStream<HoodieRecord> hoodieDataStream = dataStream
+          .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+      // TODO: This is a very time-consuming operation, will optimization
+      if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+        hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+            TypeInformation.of(HoodieRecord.class),
+            new ProcessOperator<>(new BootstrapFunction<>(conf)));
+      }
+
+      DataStream<Object> pipeline = hoodieDataStream
+           .transform("index_bootstrap",
+                  TypeInformation.of(BootstrapRecord.class),
+                  new ProcessOperator<>(new BootstrapFunction<>(conf)))
           // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-          .keyBy(HoodieRecord::getRecordKey)
+          .keyBy(BootstrapRecord::getRecordKey)
           .transform(
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 0590a23..ad9c9dc 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
 import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -49,6 +50,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -109,14 +111,22 @@ public class StreamWriteITCase extends TestLogger {
     String sourcePath = Objects.requireNonNull(Thread.currentThread()
         .getContextClassLoader().getResource("test_source.data")).toString();
 
-    DataStream<Object> dataStream = execEnv
+    DataStream<HoodieRecord> hoodieDataStream = execEnv
         // use continuous file source to trigger checkpoint
         .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
         .name("continuous_file_source")
         .setParallelism(1)
         .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
         .setParallelism(4)
-        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+              TypeInformation.of(HoodieRecord.class),
+              new ProcessOperator<>(new BootstrapFunction<>(conf)));
+    }
+
+    DataStream<Object> pipeline = hoodieDataStream
         // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
         .keyBy(HoodieRecord::getRecordKey)
         .transform(
@@ -128,7 +138,7 @@ public class StreamWriteITCase extends TestLogger {
         .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
         .uid("uid_hoodie_stream_write");
-    execEnv.addOperator(dataStream.getTransformation());
+    execEnv.addOperator(pipeline.getTransformation());
 
     JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
     // wait for the streaming job to finish
@@ -171,12 +181,20 @@ public class StreamWriteITCase extends TestLogger {
     TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
     format.setCharsetName("UTF-8");
 
-    DataStream<Object> pipeline = execEnv
+    DataStream<HoodieRecord> hoodieDataStream = execEnv
         // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
         .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
         .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
         .setParallelism(4)
-        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+            .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
+              TypeInformation.of(HoodieRecord.class),
+              new ProcessOperator<>(new BootstrapFunction<>(conf)));
+    }
+
+    DataStream<Object> pipeline = hoodieDataStream
         // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
         .keyBy(HoodieRecord::getRecordKey)
         .transform(
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 247e17e..5e68b76 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -576,10 +576,6 @@ public class TestWriteCopyOnWrite {
 
   @Test
   public void testIndexStateBootstrap() throws Exception {
-    // reset the config option
-    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
-
     // open the function and ingest data
     funcWrapper.openFunction();
     for (RowData rowData : TestData.DATA_SET_INSERT) {
@@ -598,13 +594,21 @@ public class TestWriteCopyOnWrite {
 
     funcWrapper.checkpointComplete(1);
 
-    // Mark the index state as not fully loaded to trigger re-load from the filesystem.
-    funcWrapper.clearIndexState();
+    // the data is not flushed yet
+    checkWrittenData(tempFile, EXPECTED1);
+
+    // reset the config option
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
     // upsert another data buffer
+    funcWrapper.openFunction();
     for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
       funcWrapper.invoke(rowData);
     }
+
+    assertTrue(funcWrapper.isAlreadyBootstrap());
+
     checkIndexLoaded(
         new HoodieKey("id1", "par1"),
         new HoodieKey("id2", "par1"),
@@ -613,11 +617,13 @@ public class TestWriteCopyOnWrite {
         new HoodieKey("id5", "par3"),
         new HoodieKey("id6", "par3"),
         new HoodieKey("id7", "par4"),
-        new HoodieKey("id8", "par4"));
-    // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED1);
+        new HoodieKey("id8", "par4"),
+        new HoodieKey("id9", "par3"),
+        new HoodieKey("id10", "par4"),
+        new HoodieKey("id11", "par4"));
+
     // this triggers the data write and event send
-    funcWrapper.checkpointFunction(2);
+    funcWrapper.checkpointFunction(1);
 
     String instant = funcWrapper.getWriteClient()
         .getLastPendingInstant(getTableType());
@@ -631,7 +637,7 @@ public class TestWriteCopyOnWrite {
 
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
 
-    funcWrapper.checkpointComplete(2);
+    funcWrapper.checkpointComplete(1);
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
     checkWrittenData(tempFile, EXPECTED2);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 74e43ba..705780c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapRecord;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -46,6 +48,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 
 import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,6 +71,8 @@ public class StreamWriteFunctionWrapper<I> {
 
   /** Function that converts row data to HoodieRecord. */
   private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+  /** Function that load index in state. */
+  private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
   /** Function that assigns bucket ID. */
   private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
   /** BucketAssignOperator context. **/
@@ -94,6 +99,8 @@ public class StreamWriteFunctionWrapper<I> {
     // one function
     this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
     this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+    this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
+    this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
     this.functionInitializationContext = new MockFunctionInitializationContext();
     this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
     this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
@@ -106,6 +113,13 @@ public class StreamWriteFunctionWrapper<I> {
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
 
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      bootstrapFunction = new BootstrapFunction<>(conf);
+      bootstrapFunction.setRuntimeContext(runtimeContext);
+      bootstrapFunction.initializeState(this.functionInitializationContext);
+      bootstrapFunction.open(conf);
+    }
+
     bucketAssignerFunction = new BucketAssignFunction<>(conf);
     bucketAssignerFunction.setRuntimeContext(runtimeContext);
     bucketAssignerFunction.open(conf);
@@ -136,6 +150,32 @@ public class StreamWriteFunctionWrapper<I> {
 
       }
     };
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      List<HoodieRecord<?>> bootstrapRecords = new ArrayList<>();
+
+      Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
+        @Override
+        public void collect(HoodieRecord<?> record) {
+          if (record instanceof BootstrapRecord) {
+            bootstrapRecords.add(record);
+          }
+        }
+
+        @Override
+        public void close() {
+
+        }
+      };
+
+      bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector);
+      for (HoodieRecord bootstrapRecord : bootstrapRecords) {
+        bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
+      }
+
+      this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
+    }
+
     bucketAssignerFunction.processElement(hoodieRecord, null, collector);
     writeFunction.processElement(hoodieRecords[0], null, null);
   }
@@ -210,6 +250,10 @@ public class StreamWriteFunctionWrapper<I> {
     return this.writeFunction.isConfirming();
   }
 
+  public boolean isAlreadyBootstrap() {
+    return this.bootstrapFunction.isAlreadyBootstrap();
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------