You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/02/06 14:04:07 UTC

[hudi] branch master updated: [HUDI-1557] Make Flink write pipeline write task scalable (#2506)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c5b692  [HUDI-1557] Make Flink write pipeline write task scalable (#2506)
4c5b692 is described below

commit 4c5b6923ccfaaa6616a934a3f690b1a795a42d41
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Feb 6 22:03:52 2021 +0800

    [HUDI-1557] Make Flink write pipeline write task scalable (#2506)
    
    This is the #step 2 of RFC-24:
    https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
    
    This PR introduce a BucketAssigner that assigns bucket ID (partition
    path & fileID) for each stream record.
    
    There is no need to look up index and partition the records anymore in
    the following pipeline for these records,
    we actually decide the write target location before the write and each
    record computes its location when the BucketAssigner receives it, thus,
    the indexing is with streaming style.
    
    Computing locations for a batch of records all at a time is resource
    consuming so a pressure to the engine,
    we should avoid that in streaming system.
---
 .../java/org/apache/hudi/table/WorkloadStat.java   |   8 +-
 .../hudi/table/action/commit/BucketInfo.java       |  38 +++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  15 +-
 .../hudi/index/state/FlinkInMemoryStateIndex.java  |  40 +--
 .../apache/hudi/io/FlinkCreateHandleFactory.java   |  41 +++
 .../commit/BaseFlinkCommitActionExecutor.java      | 199 ++++---------
 .../hudi/table/action/commit/FlinkWriteHelper.java |  46 ++-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/action/commit/UpsertPartitioner.java     |  10 +-
 .../table/timeline/HoodieActiveTimeline.java       |   6 +
 .../hudi/operator/InstantGenerateOperator.java     |  18 +-
 .../hudi/operator/KeyedWriteProcessFunction.java   |  50 +++-
 .../apache/hudi/operator/StreamWriteFunction.java  |  86 ++----
 .../apache/hudi/operator/StreamWriteOperator.java  |   5 +-
 .../operator/StreamWriteOperatorCoordinator.java   |  32 +-
 .../hudi/operator/StreamWriteOperatorFactory.java  |   4 +-
 .../operator/partitioner/BucketAssignFunction.java | 149 ++++++++++
 .../hudi/operator/partitioner/BucketAssigner.java  | 326 +++++++++++++++++++++
 .../transform/RowDataToHoodieFunction.java         | 108 +++++++
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   3 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |  19 +-
 .../hudi/streamer/HoodieFlinkStreamerV2.java       |  28 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  43 +++
 .../hudi/operator/StreamWriteFunctionTest.java     |  65 +++-
 .../apache/hudi/operator/StreamWriteITCase.java    | 119 +++++++-
 .../operator/partitioner/TestBucketAssigner.java   | 235 +++++++++++++++
 .../operator/utils/StreamWriteFunctionWrapper.java |  74 +++--
 .../hudi/operator/utils/TestConfigurations.java    |  13 +
 .../org/apache/hudi/operator/utils/TestData.java   |  28 +-
 30 files changed, 1435 insertions(+), 393 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
index 6fdb217..c3371ba 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadStat.java
@@ -44,7 +44,13 @@ public class WorkloadStat implements Serializable {
   }
 
   public long addUpdates(HoodieRecordLocation location, long numUpdates) {
-    updateLocationToCount.put(location.getFileId(), Pair.of(location.getInstantTime(), numUpdates));
+    long accNumUpdates = 0;
+    if (updateLocationToCount.containsKey(location.getFileId())) {
+      accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();
+    }
+    updateLocationToCount.put(
+        location.getFileId(),
+        Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));
     return this.numUpdates += numUpdates;
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
index 1d98ad4..6547da6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.commit;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Helper class for a bucket's type (INSERT and UPDATE) and its file location.
@@ -29,6 +30,24 @@ public class BucketInfo implements Serializable {
   String fileIdPrefix;
   String partitionPath;
 
+  public BucketInfo(BucketType bucketType, String fileIdPrefix, String partitionPath) {
+    this.bucketType = bucketType;
+    this.fileIdPrefix = fileIdPrefix;
+    this.partitionPath = partitionPath;
+  }
+
+  public BucketType getBucketType() {
+    return bucketType;
+  }
+
+  public String getFileIdPrefix() {
+    return fileIdPrefix;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("BucketInfo {");
@@ -38,4 +57,23 @@ public class BucketInfo implements Serializable {
     sb.append('}');
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BucketInfo that = (BucketInfo) o;
+    return bucketType == that.bucketType
+        && fileIdPrefix.equals(that.fileIdPrefix)
+        && partitionPath.equals(that.partitionPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(bucketType, fileIdPrefix, partitionPath);
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e3e0eb4..0c87f7d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
@@ -249,7 +250,17 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
     String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+    HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant);
+    activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant);
+  }
+
+  public void transitionRequestedToInflight(String tableType, String inFlightInstant) {
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+    HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
+    activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
+        config.shouldAllowMultiWriteOnSameInstant());
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 44eafd5..bae8de2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.FlinkHoodieIndex;
@@ -62,47 +61,14 @@ public class FlinkInMemoryStateIndex<T extends HoodieRecordPayload> extends Flin
   public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
                                            HoodieEngineContext context,
                                            HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(records, record -> {
-      try {
-        if (mapState.contains(record.getKey())) {
-          record.unseal();
-          record.setCurrentLocation(mapState.get(record.getKey()));
-          record.seal();
-        }
-      } catch (Exception e) {
-        LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-      }
-      return record;
-    }, 0);
+    throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");
   }
 
   @Override
   public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses,
                                           HoodieEngineContext context,
                                           HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
-    return context.map(writeStatuses, writeStatus -> {
-      for (HoodieRecord record : writeStatus.getWrittenRecords()) {
-        if (!writeStatus.isErrored(record.getKey())) {
-          HoodieKey key = record.getKey();
-          Option<HoodieRecordLocation> newLocation = record.getNewLocation();
-          if (newLocation.isPresent()) {
-            try {
-              mapState.put(key, newLocation.get());
-            } catch (Exception e) {
-              LOG.error(String.format("Update record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-            }
-          } else {
-            // Delete existing index for a deleted record
-            try {
-              mapState.remove(key);
-            } catch (Exception e) {
-              LOG.error(String.format("Remove record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
-            }
-          }
-        }
-      }
-      return writeStatus;
-    }, 0);
+    throw new UnsupportedOperationException("No need to update location for FlinkInMemoryStateIndex");
   }
 
   @Override
@@ -128,6 +94,6 @@ public class FlinkInMemoryStateIndex<T extends HoodieRecordPayload> extends Flin
    */
   @Override
   public boolean isImplicitWithStorage() {
-    return false;
+    return true;
   }
 }
\ No newline at end of file
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
new file mode 100644
index 0000000..d65663e
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
+    extends CreateHandleFactory<T, I, K, O> {
+
+  @Override
+  public HoodieWriteHandle<T, I, K, O> create(
+      HoodieWriteConfig hoodieConfig, String commitTime,
+      HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
+      String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
+    return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
+        fileIdPrefix, taskContextSupplier);
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 337e7cb..044f841 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -20,51 +20,52 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
-import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.FlinkCreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.WorkloadProfile;
-import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.time.Instant;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
+/**
+ * With {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record
+ * is tagged with a bucket ID (partition path + fileID) in streaming way. All the records consumed by this
+ * executor should be tagged with bucket IDs and belong to one data bucket.
+ *
+ * <p>These bucket IDs make it possible to shuffle the records first by the bucket ID
+ * (see org.apache.hudi.operator.partitioner.BucketAssignerFunction), and this executor
+ * only needs to handle the data buffer that belongs to one data bucket once at a time. So there is no need to
+ * partition the buffer.
+ *
+ * <p>Computing the records batch locations all at a time is a pressure to the engine,
+ * we should avoid that in streaming system.
+ */
 public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayload> extends
     BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
 
@@ -91,47 +92,39 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
-    updateIndex(writeStatuses, result);
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
+    setUpWriteMetadata(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
+  protected void setUpWriteMetadata(
+      List<WriteStatus> statuses,
+      HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.
     result.setWriteStatuses(statuses);
+    result.setIndexUpdateDuration(Duration.ZERO);
   }
 
   @Override
@@ -139,56 +132,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
     return table.getMetaClient().getCommitActionType();
   }
 
-  private Partitioner getPartitioner(WorkloadProfile profile) {
-    if (WriteOperationType.isChangingRecords(operationType)) {
-      return getUpsertPartitioner(profile);
-    } else {
-      return getInsertPartitioner(profile);
-    }
-  }
-
-  private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
-    Map<Integer, List<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>>> partitionedMidRecords = dedupedRecords
-        .stream()
-        .map(record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
-        .collect(Collectors.groupingBy(x -> partitioner.getPartition(x._1)));
-    Map<Integer, List<HoodieRecord<T>>> results = new LinkedHashMap<>();
-    partitionedMidRecords.forEach((key, value) -> results.put(key, value.stream().map(x -> x._2).collect(Collectors.toList())));
-    return results;
-  }
-
-  protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
-    HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
-    WorkloadStat globalStat = new WorkloadStat();
-
-    Map<Pair<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
-        .stream()
-        .map(record -> Pair.of(
-            Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
-        .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
-
-    for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
-      String partitionPath = e.getKey().getLeft();
-      Long count = e.getValue();
-      Option<HoodieRecordLocation> locOption = e.getKey().getRight();
-
-      if (!partitionPathStatMap.containsKey(partitionPath)) {
-        partitionPathStatMap.put(partitionPath, new WorkloadStat());
-      }
-
-      if (locOption.isPresent()) {
-        // update
-        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
-        globalStat.addUpdates(locOption.get(), count);
-      } else {
-        // insert
-        partitionPathStatMap.get(partitionPath).addInserts(count);
-        globalStat.addInserts(count);
-      }
-    }
-    return Pair.of(partitionPathStatMap, globalStat);
-  }
-
   @Override
   protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
     commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
@@ -228,31 +171,28 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
+      String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
-  }
-
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
                                                   Iterator<HoodieRecord<T>> recordItr)
@@ -293,13 +233,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
-                                              Map<String, HoodieRecord<T>> keyToNewRecords,
-                                              HoodieBaseFile dataFileToBeMerged) {
-    return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
-        partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
-  }
-
   @Override
   public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
       throws Exception {
@@ -309,24 +242,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
     return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
-        taskContextSupplier, new CreateHandleFactory<>());
+        taskContextSupplier, new FlinkCreateHandleFactory<>());
   }
-
-  /**
-   * Provides a partitioner to perform the upsert operation, based on the workload profile.
-   */
-  public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
-    if (profile == null) {
-      throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
-    }
-    return new UpsertPartitioner(profile, context, table, config);
-  }
-
-  /**
-   * Provides a partitioner to perform the insert operation, based on the workload profile.
-   */
-  public Partitioner getInsertPartitioner(WorkloadProfile profile) {
-    return getUpsertPartitioner(profile);
-  }
-
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 191071e..5238123 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -19,17 +19,32 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+/**
+ * Overrides the {@link #write} method to not look up index and partition the records, because
+ * with {@code org.apache.hudi.operator.partitioner.BucketAssigner}, each hoodie record
+ * is tagged with a bucket ID (partition path + fileID) in streaming way. The FlinkWriteHelper only hands over
+ * the records to the action executor {@link BaseCommitActionExecutor} to execute.
+ *
+ * <p>Computing the records batch locations all at a time is a pressure to the engine,
+ * we should avoid that in streaming system.
+ */
 public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
     List<HoodieKey>, List<WriteStatus>, R> {
 
@@ -45,22 +60,45 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
   }
 
   @Override
+  public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
+                                                      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean shouldCombine, int shuffleParallelism,
+                                                      BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, boolean performTagging) {
+    try {
+      Instant lookupBegin = Instant.now();
+      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
+
+      HoodieWriteMetadata<List<WriteStatus>> result = executor.execute(inputRecords);
+      result.setIndexLookupDuration(indexLookupDuration);
+      return result;
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
+    }
+  }
+
+  @Override
   public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records,
                                                      HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
                                                      int parallelism) {
-    boolean isIndexingGlobal = index.isGlobal();
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
-      HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their partitionPath
-      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
+      final Object key = record.getKey().getRecordKey();
       return Pair.of(key, record);
     }).collect(Collectors.groupingBy(Pair::getLeft));
 
     return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
       @SuppressWarnings("unchecked")
       T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+      // we cannot allow the user to change the key or partitionPath, since that will affect
+      // everything
+      // so pick it from one of the records.
       HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
-      return new HoodieRecord<T>(reducedKey, reducedData);
+      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData);
+      // reuse the location from the first record.
+      hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
+      return hoodieRecord;
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 8cc9b0d..f44e83d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -116,10 +116,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -186,10 +183,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 4f19203..eeeeacf 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -114,10 +114,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -184,10 +181,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Part
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index ee153c8..9d60cde 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -120,10 +120,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
   private int addUpdateBucket(String partitionPath, String fileIdHint) {
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo();
-    bucketInfo.bucketType = BucketType.UPDATE;
-    bucketInfo.fileIdPrefix = fileIdHint;
-    bucketInfo.partitionPath = partitionPath;
+    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -223,10 +220,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
             } else {
               recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
             }
-            BucketInfo bucketInfo = new BucketInfo();
-            bucketInfo.bucketType = BucketType.INSERT;
-            bucketInfo.partitionPath = partitionPath;
-            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
+            BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
             bucketInfoMap.put(totalBuckets, bucketInfo);
             totalBuckets++;
           }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index fcb4fd9..865f0dc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -167,6 +167,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  public void deletePending(HoodieInstant.State state, String action, String instantStr) {
+    HoodieInstant instant = new HoodieInstant(state, action, instantStr);
+    ValidationUtils.checkArgument(!instant.isCompleted());
+    deleteInstantFile(instant);
+  }
+
   public void deleteCompactionRequested(HoodieInstant instant) {
     ValidationUtils.checkArgument(instant.isRequested());
     ValidationUtils.checkArgument(Objects.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION));
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 5c9930d..dea8a05 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
@@ -115,7 +114,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
       writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
       // init table, create it if not exists.
-      initTable();
+      StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg));
 
       // create instant marker directory
       createInstantMarkerDir();
@@ -189,6 +188,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
    */
   private String startNewInstant(long checkpointId) {
     String newTime = writeClient.startCommit();
+    this.writeClient.transitionRequestedToInflight(this.cfg.tableType, newTime);
     LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
     return newTime;
   }
@@ -218,20 +218,6 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
     throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", retryTimes * retryInterval));
   }
 
-
-  /**
-   * Create table if not exists.
-   */
-  private void initTable() throws IOException {
-    if (!fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient.initTableType(new Configuration(serializableHadoopConf.get()), cfg.targetBasePath,
-          HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, 1);
-      LOG.info("Table initialized");
-    } else {
-      LOG.info("Table already [{}/{}] exists, do nothing here", cfg.targetBasePath, cfg.targetTableName);
-    }
-  }
-
   @Override
   public void close() throws Exception {
     if (writeClient != null) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index a59a995..4309bb0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,8 +42,10 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A {@link KeyedProcessFunction} where the write operations really happens.
@@ -52,7 +56,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
   /**
    * Records buffer, will be processed in snapshotState function.
    */
-  private List<HoodieRecord> bufferedRecords = new LinkedList<>();
+  private Map<String, List<HoodieRecord>> bufferedRecords;
 
   /**
    * Flink collector help s to send data downstream.
@@ -88,6 +92,8 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
 
+    this.bufferedRecords = new LinkedHashMap<>();
+
     indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
 
     cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
@@ -112,17 +118,24 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
         String instantTimestamp = latestInstant;
         LOG.info("Write records, subtask id = [{}]  checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size());
 
-        List<WriteStatus> writeStatus;
-        switch (cfg.operation) {
-          case INSERT:
-            writeStatus = writeClient.insert(bufferedRecords, instantTimestamp);
-            break;
-          case UPSERT:
-            writeStatus = writeClient.upsert(bufferedRecords, instantTimestamp);
-            break;
-          default:
-            throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
-        }
+        final List<WriteStatus> writeStatus = new ArrayList<>();
+        this.bufferedRecords.values().forEach(records -> {
+          if (records.size() > 0) {
+            if (cfg.filterDupes) {
+              records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+            }
+            switch (cfg.operation) {
+              case INSERT:
+                writeStatus.addAll(writeClient.insert(records, instantTimestamp));
+                break;
+              case UPSERT:
+                writeStatus.addAll(writeClient.upsert(records, instantTimestamp));
+                break;
+              default:
+                throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
+            }
+          }
+        });
         output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask));
         bufferedRecords.clear();
       }
@@ -144,7 +157,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
     }
 
     // buffer the records
-    bufferedRecords.add(hoodieRecord);
+    putDataIntoBuffer(hoodieRecord);
   }
 
   public boolean hasRecordsIn() {
@@ -155,6 +168,15 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
     return latestInstant;
   }
 
+  private void putDataIntoBuffer(HoodieRecord<?> record) {
+    final String fileId = record.getCurrentLocation().getFileId();
+    final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
+    if (!this.bufferedRecords.containsKey(key)) {
+      this.bufferedRecords.put(key, new ArrayList<>());
+    }
+    this.bufferedRecords.get(key).add(record);
+  }
+
   @Override
   public void close() {
     if (writeClient != null) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
index 34a61d4..5877098 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
@@ -18,22 +18,18 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
-import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -41,7 +37,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -50,7 +45,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
@@ -96,7 +93,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   /**
    * Write buffer for a checkpoint.
    */
-  private transient List<HoodieRecord> buffer;
+  private transient Map<String, List<HoodieRecord>> buffer;
 
   /**
    * The buffer lock to control data buffering/flushing.
@@ -131,23 +128,6 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
 
   /**
-   * HoodieKey generator.
-   */
-  private transient KeyGenerator keyGenerator;
-
-  /**
-   * Row type of the input.
-   */
-  private final RowType rowType;
-
-  /**
-   * Avro schema of the input.
-   */
-  private final Schema avroSchema;
-
-  private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
-
-  /**
    * The REQUESTED instant we write the data.
    */
   private volatile String currentInstant;
@@ -160,20 +140,15 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   /**
    * Constructs a StreamingSinkFunction.
    *
-   * @param rowType The input row type
    * @param config  The config options
    */
-  public StreamWriteFunction(RowType rowType, Configuration config) {
-    this.rowType = rowType;
-    this.avroSchema = StreamerUtil.getSourceSchema(config);
+  public StreamWriteFunction(Configuration config) {
     this.config = config;
   }
 
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
-    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
     initBuffer();
     initWriteClient();
     initWriteFunction();
@@ -211,7 +186,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
       if (onCheckpointing) {
         addToBufferCondition.await();
       }
-      this.buffer.add(toHoodieRecord(value));
+      putDataIntoBuffer(value);
     } finally {
       bufferLock.unlock();
     }
@@ -230,7 +205,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
 
   @VisibleForTesting
   @SuppressWarnings("rawtypes")
-  public List<HoodieRecord> getBuffer() {
+  public Map<String, List<HoodieRecord>> getBuffer() {
     return buffer;
   }
 
@@ -249,7 +224,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
   // -------------------------------------------------------------------------
 
   private void initBuffer() {
-    this.buffer = new ArrayList<>();
+    this.buffer = new LinkedHashMap<>();
     this.bufferLock = new ReentrantLock();
     this.addToBufferCondition = this.bufferLock.newCondition();
   }
@@ -277,32 +252,33 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
     }
   }
 
-  /**
-   * Converts the give record to a {@link HoodieRecord}.
-   *
-   * @param record The input record
-   * @return HoodieRecord based on the configuration
-   * @throws IOException if error occurs
-   */
-  @SuppressWarnings("rawtypes")
-  private HoodieRecord toHoodieRecord(I record) throws IOException {
-    boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
-        || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
-    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
-    final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
-    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
-        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
-    HoodieRecordPayload payload = shouldCombine
-        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
-        : StreamerUtil.createPayload(payloadClazz, gr);
-    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+  private void putDataIntoBuffer(I value) {
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final String fileId = record.getCurrentLocation().getFileId();
+    final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
+    if (!this.buffer.containsKey(key)) {
+      this.buffer.put(key, new ArrayList<>());
+    }
+    this.buffer.get(key).add(record);
   }
 
+  @SuppressWarnings("unchecked, rawtypes")
   private void flushBuffer() {
     final List<WriteStatus> writeStatus;
     if (buffer.size() > 0) {
-      writeStatus = writeFunction.apply(buffer, currentInstant);
-      buffer.clear();
+      writeStatus = new ArrayList<>();
+      this.buffer.values()
+          // The records are partitioned by the bucket ID and each batch sent to
+          // the writer belongs to one bucket.
+          .forEach(records -> {
+            if (records.size() > 0) {
+              if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
+                records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
+              }
+              writeStatus.addAll(writeFunction.apply(records, currentInstant));
+            }
+          });
+      this.buffer.clear();
     } else {
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
       writeStatus = Collections.emptyList();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
index 3f4d940..247269c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Operator for {@link StreamSink}.
@@ -36,8 +35,8 @@ public class StreamWriteOperator<I>
     implements OperatorEventHandler {
   private final StreamWriteFunction<Object, I, Object> sinkFunction;
 
-  public StreamWriteOperator(RowType rowType, Configuration conf) {
-    super(new StreamWriteFunction<>(rowType, conf));
+  public StreamWriteOperator(Configuration conf) {
+    super(new StreamWriteFunction<>(conf));
     this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
index 524c601..bf0cfc2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
@@ -38,8 +35,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +54,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
+
 /**
  * {@link OperatorCoordinator} for {@link StreamWriteFunction}.
  *
@@ -121,7 +118,7 @@ public class StreamWriteOperatorCoordinator
     // writeClient
     initWriteClient();
     // init table, create it if not exists.
-    initTable();
+    initTableIfNotExists(this.conf);
   }
 
   @Override
@@ -139,6 +136,7 @@ public class StreamWriteOperatorCoordinator
           + " data has not finish writing, roll back the last write and throw";
       checkAndForceCommit(errMsg);
       this.inFlightInstant = this.writeClient.startCommit();
+      this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
       this.inFlightCheckpoint = checkpointId;
       LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
       result.complete(writeCheckpointBytes());
@@ -200,28 +198,6 @@ public class StreamWriteOperatorCoordinator
         true);
   }
 
-  private void initTable() throws IOException {
-    final String basePath = this.conf.getString(FlinkOptions.PATH);
-    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
-    // Hadoop FileSystem
-    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
-      if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
-        HoodieTableMetaClient.initTableType(
-            hadoopConf,
-            basePath,
-            HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
-            this.conf.getString(FlinkOptions.TABLE_NAME),
-            "archived",
-            this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
-            1);
-        LOG.info("Table initialized");
-      } else {
-        LOG.info("Table [{}/{}] already exists, no need to initialize the table",
-            basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
-      }
-    }
-  }
-
   static byte[] readBytes(DataInputStream in, int size) throws IOException {
     byte[] bytes = new byte[size];
     in.readFully(bytes);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
index f5faa54..5626745 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Factory class for {@link StreamWriteOperator}.
@@ -43,10 +42,9 @@ public class StreamWriteOperatorFactory<I>
   private final int numTasks;
 
   public StreamWriteOperatorFactory(
-      RowType rowType,
       Configuration conf,
       int numTasks) {
-    super(new StreamWriteOperator<>(rowType, conf));
+    super(new StreamWriteOperator<>(conf));
     this.operator = (StreamWriteOperator<I>) getOperator();
     this.conf = conf;
     this.numTasks = numTasks;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
new file mode 100644
index 0000000..269ccc8
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The function to build the write profile incrementally for records within a checkpoint,
+ * it then assigns the bucket with ID using the {@link BucketAssigner}.
+ *
+ * <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
+ * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
+ * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
+ * where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
+ * the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
+ * supports specifying the bucket type explicitly.
+ *
+ * <p>The output records should then shuffle by the bucket ID and thus do scalable write.
+ *
+ * @see BucketAssigner
+ */
+public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
+    extends KeyedProcessFunction<K, I, O>
+    implements CheckpointedFunction, CheckpointListener {
+
+  private MapState<HoodieKey, HoodieRecordLocation> indexState;
+
+  private BucketAssigner bucketAssigner;
+
+  private final Configuration conf;
+
+  private final boolean isChangingRecords;
+
+  public BucketAssignFunction(Configuration conf) {
+    this.conf = conf;
+    this.isChangingRecords = WriteOperationType.isChangingRecords(
+        WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
+    HoodieFlinkEngineContext context =
+        new HoodieFlinkEngineContext(
+            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+            new FlinkTaskContextSupplier(getRuntimeContext()));
+    this.bucketAssigner = new BucketAssigner(
+        context,
+        writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) {
+    this.bucketAssigner.reset();
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
+        new MapStateDescriptor<>(
+            "indexState",
+            TypeInformation.of(HoodieKey.class),
+            TypeInformation.of(HoodieRecordLocation.class));
+    indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
+    // 1. put the record into the BucketAssigner;
+    // 2. look up the state for location, if the record has a location, just send it out;
+    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    final HoodieKey hoodieKey = record.getKey();
+    final BucketInfo bucketInfo;
+    final HoodieRecordLocation location;
+    // Only changing records need looking up the index for the location,
+    // append only records are always recognized as INSERT.
+    if (isChangingRecords && this.indexState.contains(hoodieKey)) {
+      // Set up the instant time as "U" to mark the bucket as an update bucket.
+      location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId());
+      this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId());
+    } else {
+      bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
+      switch (bucketInfo.getBucketType()) {
+        case INSERT:
+          // This is an insert bucket, use HoodieRecordLocation instant time as "I".
+          // Downstream operators can then check the instant time to know whether
+          // a record belongs to an insert bucket.
+          location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
+          break;
+        case UPDATE:
+          location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
+          break;
+        default:
+          throw new AssertionError();
+      }
+      this.indexState.put(hoodieKey, location);
+    }
+    record.unseal();
+    record.setCurrentLocation(location);
+    record.seal();
+    out.collect((O) record);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long l) {
+    // Refresh the table state when there are new commits.
+    this.bucketAssigner.refreshTable();
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java
new file mode 100644
index 0000000..f87a802
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigner.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Bucket assigner that assigns the data buffer of one checkpoint into buckets.
+ *
+ * <p>This assigner assigns the record one by one.
+ * If the record is an update, checks and reuse existing UPDATE bucket or generates a new one;
+ * If the record is an insert, checks the record partition for small files first, try to find a small file
+ * that has space to append new records and reuse the small file's data bucket, if
+ * there is no small file(or no left space for new records), generates an INSERT bucket.
+ *
+ * <p>Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique
+ * within and among partitions.
+ */
+public class BucketAssigner {
+  private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
+
+  /**
+   * Remembers what type each bucket is for later.
+   */
+  private final HashMap<String, BucketInfo> bucketInfoMap;
+
+  private HoodieTable<?, ?, ?, ?> table;
+
+  /**
+   * Fink engine context.
+   */
+  private final HoodieFlinkEngineContext context;
+
+  /**
+   * The write config.
+   */
+  private final HoodieWriteConfig config;
+
+  /**
+   * The average record size.
+   */
+  private final long averageRecordSize;
+
+  /**
+   * Total records to write for each bucket based on
+   * the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}.
+   */
+  private final long insertRecordsPerBucket;
+
+  /**
+   * Partition path to small files mapping.
+   */
+  private final Map<String, List<SmallFile>> partitionSmallFilesMap;
+
+  /**
+   * Bucket ID(partition + fileId) -> small file assign state.
+   */
+  private final Map<String, SmallFileAssignState> smallFileAssignStates;
+
+  /**
+   * Bucket ID(partition + fileId) -> new file assign state.
+   */
+  private final Map<String, NewFileAssignState> newFileAssignStates;
+
+  public BucketAssigner(
+      HoodieFlinkEngineContext context,
+      HoodieWriteConfig config) {
+    bucketInfoMap = new HashMap<>();
+    partitionSmallFilesMap = new HashMap<>();
+    smallFileAssignStates = new HashMap<>();
+    newFileAssignStates = new HashMap<>();
+    this.context = context;
+    this.config = config;
+    this.table = HoodieFlinkTable.create(this.config, this.context);
+    averageRecordSize = averageBytesPerRecord(
+        table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
+        config);
+    LOG.info("AvgRecordSize => " + averageRecordSize);
+    insertRecordsPerBucket = config.shouldAutoTuneInsertSplits()
+        ? config.getParquetMaxFileSize() / averageRecordSize
+        : config.getCopyOnWriteInsertSplitSize();
+    LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket);
+  }
+
+  /**
+   * Reset the states of this assigner, should do once for each checkpoint,
+   * all the states are accumulated within one checkpoint interval.
+   */
+  public void reset() {
+    bucketInfoMap.clear();
+    partitionSmallFilesMap.clear();
+    smallFileAssignStates.clear();
+    newFileAssignStates.clear();
+  }
+
+  public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
+    final String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint);
+    if (!bucketInfoMap.containsKey(key)) {
+      BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
+      bucketInfoMap.put(key, bucketInfo);
+    }
+    // else do nothing because the bucket already exists.
+    return bucketInfoMap.get(key);
+  }
+
+  public BucketInfo addInsert(String partitionPath) {
+    // for new inserts, compute buckets depending on how many records we have for each partition
+    List<SmallFile> smallFiles = getSmallFilesForPartition(partitionPath);
+
+    // first try packing this into one of the smallFiles
+    for (SmallFile smallFile : smallFiles) {
+      final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId());
+      SmallFileAssignState assignState = smallFileAssignStates.get(key);
+      assert assignState != null;
+      if (assignState.canAssign()) {
+        assignState.assign();
+        // create a new bucket or re-use an existing bucket
+        BucketInfo bucketInfo;
+        if (bucketInfoMap.containsKey(key)) {
+          // Assigns an inserts to existing update bucket
+          bucketInfo = bucketInfoMap.get(key);
+        } else {
+          bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId());
+        }
+        return bucketInfo;
+      }
+    }
+
+    // if we have anything more, create new insert buckets, like normal
+    if (newFileAssignStates.containsKey(partitionPath)) {
+      NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
+      if (newFileAssignState.canAssign()) {
+        newFileAssignState.assign();
+      }
+      final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
+      return bucketInfoMap.get(key);
+    }
+    BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
+    final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
+    bucketInfoMap.put(key, bucketInfo);
+    newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket));
+    return bucketInfo;
+  }
+
+  private List<SmallFile> getSmallFilesForPartition(String partitionPath) {
+    if (partitionSmallFilesMap.containsKey(partitionPath)) {
+      return partitionSmallFilesMap.get(partitionPath);
+    }
+    List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+    if (smallFiles.size() > 0) {
+      LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
+      partitionSmallFilesMap.put(partitionPath, smallFiles);
+      smallFiles.forEach(smallFile ->
+          smallFileAssignStates.put(
+              StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()),
+              new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize)));
+      return smallFiles;
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Refresh the table state like TableFileSystemView and HoodieTimeline.
+   */
+  public void refreshTable() {
+    this.table = HoodieFlinkTable.create(this.config, this.context);
+  }
+
+  /**
+   * Returns a list of small files in the given partition path.
+   */
+  protected List<SmallFile> getSmallFiles(String partitionPath) {
+
+    // smallFiles only for partitionPath
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+
+    if (!commitTimeline.empty()) { // if we have some commits
+      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
+          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
+
+      for (HoodieBaseFile file : allFiles) {
+        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+          String filename = file.getFileName();
+          SmallFile sf = new SmallFile();
+          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+          sf.sizeBytes = file.getFileSize();
+          smallFileLocations.add(sf);
+        }
+      }
+    }
+
+    return smallFileLocations;
+  }
+
+  /**
+   * Obtains the average record size based on records written during previous commits. Used for estimating how many
+   * records pack into one file.
+   */
+  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
+    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
+    try {
+      if (!commitTimeline.empty()) {
+        // Go over the reverse ordered commits to get a more recent estimate of average record size.
+        Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+          if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
+            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
+            break;
+          }
+        }
+      }
+    } catch (Throwable t) {
+      // make this fail safe.
+      LOG.error("Error trying to compute average bytes/record ", t);
+    }
+    return avgSize;
+  }
+
+  /**
+   * Candidate bucket state for small file. It records the total number of records
+   * that the bucket can append and the current number of assigned records.
+   */
+  private static class SmallFileAssignState {
+    long assigned;
+    long totalUnassigned;
+
+    SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) {
+      this.assigned = 0;
+      this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize;
+    }
+
+    public boolean canAssign() {
+      return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
+    }
+
+    /**
+     * Remembers to invoke {@link #canAssign()} first.
+     */
+    public void assign() {
+      Preconditions.checkState(canAssign(),
+          "Can not assign insert to small file: assigned => "
+              + this.assigned + " totalUnassigned => " + this.totalUnassigned);
+      this.assigned++;
+    }
+  }
+
+  /**
+   * Candidate bucket state for a new file. It records the total number of records
+   * that the bucket can append and the current number of assigned records.
+   */
+  private static class NewFileAssignState {
+    long assigned;
+    long totalUnassigned;
+    final String fileId;
+
+    NewFileAssignState(String fileId, long insertRecordsPerBucket) {
+      this.fileId = fileId;
+      this.assigned = 0;
+      this.totalUnassigned = insertRecordsPerBucket;
+    }
+
+    public boolean canAssign() {
+      return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
+    }
+
+    /**
+     * Remembers to invoke {@link #canAssign()} first.
+     */
+    public void assign() {
+      Preconditions.checkState(canAssign(),
+          "Can not assign insert to new file: assigned => "
+              + this.assigned + " totalUnassigned => " + this.totalUnassigned);
+      this.assigned++;
+    }
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
new file mode 100644
index 0000000..2d47c79
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/transform/RowDataToHoodieFunction.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.transform;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+
+/**
+ * Function that transforms RowData to HoodieRecord.
+ */
+public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?>>
+    extends RichMapFunction<I, O> {
+  /**
+   * Row type of the input.
+   */
+  private final RowType rowType;
+
+  /**
+   * Avro schema of the input.
+   */
+  private transient Schema avroSchema;
+
+  /**
+   * RowData to Avro record converter.
+   */
+  private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
+
+  /**
+   * HoodieKey generator.
+   */
+  private transient KeyGenerator keyGenerator;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  public RowDataToHoodieFunction(RowType rowType, Configuration config) {
+    this.rowType = rowType;
+    this.config = config;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.avroSchema = StreamerUtil.getSourceSchema(this.config);
+    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
+    this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public O map(I i) throws Exception {
+    return (O) toHoodieRecord(i);
+  }
+
+  /**
+   * Converts the give record to a {@link HoodieRecord}.
+   *
+   * @param record The input record
+   * @return HoodieRecord based on the configuration
+   * @throws IOException if error occurs
+   */
+  @SuppressWarnings("rawtypes")
+  private HoodieRecord toHoodieRecord(I record) throws IOException {
+    boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+        || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
+    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
+    final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
+    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
+        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+    HoodieRecordPayload payload = shouldCombine
+        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
+        : StreamerUtil.createPayload(payloadClazz, gr);
+    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 7df63fa..418e2ea 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -71,8 +71,7 @@ public class FlinkStreamerConfig extends Configuration {
       + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
       + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
       + "to individual classes, for supported properties.")
-  public String propsFilePath =
-      "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
+  public String propsFilePath = "";
 
   @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
       + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index f6d75d3..d110bff 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.InstantGenerateOperator;
 import org.apache.hudi.operator.KeyedWriteProcessFunction;
 import org.apache.hudi.operator.KeyedWriteProcessOperator;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.CommitSink;
 import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
 import org.apache.hudi.util.StreamerUtil;
@@ -34,9 +36,11 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 
 import java.util.List;
@@ -66,12 +70,16 @@ public class HoodieFlinkStreamer {
       env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
     }
 
+    Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
+    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+
     TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
 
     // add data source config
     props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
     props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
 
+    StreamerUtil.initTableIfNotExists(conf);
     // Read from kafka source
     DataStream<HoodieRecord> inputRecords =
         env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
@@ -86,13 +94,20 @@ public class HoodieFlinkStreamer {
 
         // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
         .keyBy(HoodieRecord::getPartitionPath)
-
+        // use the bucket assigner to generate bucket IDs
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         // write operator, where the write operation really happens
         .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
         }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
         .name("write_process")
         .uid("write_process_uid")
-        .setParallelism(env.getParallelism())
+        .setParallelism(numWriteTask)
 
         // Commit can only be executed once, so make it one parallelism
         .addSink(new CommitSink())
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
index a8f9245..24b8994 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -18,22 +18,25 @@
 
 package org.apache.hudi.streamer;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.operator.StreamWriteOperatorFactory;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
 import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Properties;
@@ -70,13 +73,8 @@ public class HoodieFlinkStreamerV2 {
             .getLogicalType();
     Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
     int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
-    StreamWriteOperatorFactory<RowData> operatorFactory =
-        new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
-
-    int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
-    LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
-    final RowData.FieldGetter partitionFieldGetter =
-        RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf, numWriteTask);
 
     DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
@@ -89,11 +87,19 @@ public class HoodieFlinkStreamerV2 {
         ), kafkaProps))
         .name("kafka_source")
         .uid("uid_kafka_source")
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
         // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
-        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .keyBy(HoodieRecord::getPartitionPath)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", null, operatorFactory)
         .uid("uid_hoodie_stream_write")
-        .setParallelism(numWriteTask); // should make it configurable
+        .setParallelism(numWriteTask);
 
     env.addOperator(dataStream.getTransformation());
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 2fa8757..4447705 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
@@ -57,6 +59,7 @@ import java.util.Properties;
  * Utilities for Flink stream read and write.
  */
 public class StreamerUtil {
+  private static final String DEFAULT_ARCHIVE_LOG_FOLDER = "archived";
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
 
@@ -68,6 +71,9 @@ public class StreamerUtil {
   }
 
   public static TypedProperties getProps(FlinkStreamerConfig cfg) {
+    if (cfg.propsFilePath.isEmpty()) {
+      return new TypedProperties();
+    }
     return readConfig(
         FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
         new Path(cfg.propsFilePath), cfg.configs).getConfig();
@@ -208,6 +214,10 @@ public class StreamerUtil {
     }
   }
 
+  public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) {
+    return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
+  }
+
   public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder()
@@ -250,4 +260,37 @@ public class StreamerUtil {
     checkPropNames.forEach(prop ->
         Preconditions.checkState(props.containsKey(prop), "Required property " + prop + " is missing"));
   }
+
+  /**
+   * Initialize the table if it does not exist.
+   *
+   * @param conf the configuration
+   * @throws IOException if errors happens when writing metadata
+   */
+  public static void initTableIfNotExists(Configuration conf) throws IOException {
+    final String basePath = conf.getString(FlinkOptions.PATH);
+    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+    // Hadoop FileSystem
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient.initTableType(
+            hadoopConf,
+            basePath,
+            HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
+            conf.getString(FlinkOptions.TABLE_NAME),
+            DEFAULT_ARCHIVE_LOG_FOLDER,
+            conf.getString(FlinkOptions.PAYLOAD_CLASS),
+            1);
+        LOG.info("Table initialized under base path {}", basePath);
+      } else {
+        LOG.info("Table [{}/{}] already exists, no need to initialize the table",
+            basePath, conf.getString(FlinkOptions.TABLE_NAME));
+      }
+    }
+  }
+
+  /** Generates the bucket ID using format {partition path}_{fileID}. */
+  public static String generateBucketKey(String partitionPath, String fileId) {
+    return String.format("%s_%s", partitionPath, fileId);
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
index c2d7a65..fea9b8f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
@@ -27,6 +27,7 @@ import org.apache.hudi.operator.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.operator.utils.TestData;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.table.data.RowData;
 import org.hamcrest.MatcherAssert;
@@ -56,24 +57,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class StreamWriteFunctionTest {
 
-  private static final Map<String, String> EXPECTED = new HashMap<>();
-
-  static {
-    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
-    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
-    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
-    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
-  }
+  private static final Map<String, String> EXPECTED1 = new HashMap<>();
 
   private static final Map<String, String> EXPECTED2 = new HashMap<>();
 
+  private static final Map<String, String> EXPECTED3 = new HashMap<>();
+
   static {
+    EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
+    EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
+    EXPECTED1.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
+    EXPECTED1.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
     EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
     EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
     EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
         + "id9,par3,id9,Jane,19,6,par3]");
     EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
         + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+
+    EXPECTED3.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
   }
 
   private StreamWriteFunctionWrapper<RowData> funcWrapper;
@@ -83,9 +86,7 @@ public class StreamWriteFunctionTest {
 
   @BeforeEach
   public void before() throws Exception {
-    this.funcWrapper = new StreamWriteFunctionWrapper<>(
-        tempFile.getAbsolutePath(),
-        TestConfigurations.SERIALIZER);
+    this.funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath());
   }
 
   @AfterEach
@@ -211,7 +212,7 @@ public class StreamWriteFunctionTest {
 
     final OperatorEvent nextEvent = funcWrapper.getNextEvent();
     assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -220,7 +221,43 @@ public class StreamWriteFunctionTest {
     funcWrapper.checkpointComplete(1);
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
+  }
+
+  @Test
+  public void testInsertDuplicates() throws Exception {
+    // reset the config option
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    checkWrittenData(tempFile, EXPECTED3, 1);
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_THREE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    checkWrittenData(tempFile, EXPECTED3, 1);
   }
 
   @Test
@@ -248,7 +285,7 @@ public class StreamWriteFunctionTest {
       funcWrapper.invoke(rowData);
     }
     // the data is not flushed yet
-    checkWrittenData(tempFile, EXPECTED);
+    checkWrittenData(tempFile, EXPECTED1);
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(2);
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index 56f946b..f745a3c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -18,16 +18,24 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
 import org.apache.hudi.operator.utils.TestConfigurations;
 import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.sink.CommitSink;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
@@ -37,7 +45,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
@@ -47,6 +55,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -74,19 +83,15 @@ public class StreamWriteITCase extends TestLogger {
     StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     execEnv.getConfig().disableObjectReuse();
     execEnv.setParallelism(4);
-    // 1 second a time
-    execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
 
-    // Read from kafka source
+    // Read from file source
     RowType rowType =
         (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
-    StreamWriteOperatorFactory<RowData> operatorFactory =
-        new StreamWriteOperatorFactory<>(rowType, conf, 4);
-
-    int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
-    final RowData.FieldGetter partitionFieldGetter =
-        RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex);
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf, 4);
 
     JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
         rowType,
@@ -107,17 +112,103 @@ public class StreamWriteITCase extends TestLogger {
         // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
         .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
         .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4)
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
         // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
-        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .keyBy(HoodieRecord::getPartitionPath)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", null, operatorFactory)
-        .uid("uid_hoodie_stream_write")
-        .setParallelism(4);
+        .uid("uid_hoodie_stream_write");
     execEnv.addOperator(dataStream.getTransformation());
 
     JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
     if (client.getJobStatus().get() != JobStatus.FAILED) {
       try {
-        TimeUnit.SECONDS.sleep(10);
+        TimeUnit.SECONDS.sleep(8);
+        client.cancel();
+      } catch (Throwable var1) {
+        // ignored
+      }
+    }
+
+    TestData.checkWrittenData(tempFile, EXPECTED);
+  }
+
+  @Test
+  public void testWriteToHoodieLegacy() throws Exception {
+    FlinkStreamerConfig streamerConf = TestConfigurations.getDefaultStreamerConf(tempFile.getAbsolutePath());
+    Configuration conf = FlinkOptions.fromStreamerConfig(streamerConf);
+    StreamerUtil.initTableIfNotExists(conf);
+    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getConfig().setGlobalJobParameters(streamerConf);
+
+    // Read from file source
+    RowType rowType =
+        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+        rowType,
+        new RowDataTypeInfo(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    execEnv
+        // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+        .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+        .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4)
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+        .transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
+        .name("instant_generator")
+        .uid("instant_generator_id")
+
+        // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
+        .keyBy(HoodieRecord::getPartitionPath)
+        // use the bucket assigner to generate bucket IDs
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
+        // write operator, where the write operation really happens
+        .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
+        }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
+        .name("write_process")
+        .uid("write_process_uid")
+        .setParallelism(4)
+
+        // Commit can only be executed once, so make it one parallelism
+        .addSink(new CommitSink())
+        .name("commit_sink")
+        .uid("commit_sink_uid")
+        .setParallelism(1);
+
+    JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+    if (client.getJobStatus().get() != JobStatus.FAILED) {
+      try {
+        TimeUnit.SECONDS.sleep(8);
         client.cancel();
       } catch (Throwable var1) {
         // ignored
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java
new file mode 100644
index 0000000..e27ea07
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/partitioner/TestBucketAssigner.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.table.action.commit.BucketInfo;
+import org.apache.hudi.table.action.commit.BucketType;
+import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link BucketAssigner}.
+ */
+public class TestBucketAssigner {
+  private HoodieWriteConfig writeConfig;
+  private HoodieFlinkEngineContext context;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws IOException {
+    final String basePath = tempFile.getAbsolutePath();
+    final Configuration conf = TestConfigurations.getDefaultConf(basePath);
+
+    writeConfig = StreamerUtil.getHoodieClientConfig(conf);
+    context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+        new FlinkTaskContextSupplier(null));
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testAddUpdate() {
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
+    BucketInfo bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0");
+
+    mockBucketAssigner.addUpdate("par1", "file_id_0");
+    bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_0");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_0");
+
+    mockBucketAssigner.addUpdate("par1", "file_id_1");
+    bucketInfo = mockBucketAssigner.addUpdate("par1", "file_id_1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "file_id_1");
+
+    bucketInfo = mockBucketAssigner.addUpdate("par2", "file_id_0");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "file_id_0");
+
+    bucketInfo = mockBucketAssigner.addUpdate("par3", "file_id_2");
+    assertBucketEquals(bucketInfo, "par3", BucketType.UPDATE, "file_id_2");
+  }
+
+  @Test
+  public void testAddInsert() {
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.INSERT);
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.INSERT);
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+  }
+
+  @Test
+  public void testInsertWithSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1));
+    smallFilesMap.put("par2", Collections.singletonList(f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap);
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+    bucketInfo = mockBucketAssigner.addInsert("par3");
+    assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+  }
+
+  @Test
+  public void testUpdateAndInsertWithSmallFiles() {
+    SmallFile f0 = new SmallFile();
+    f0.location = new HoodieRecordLocation("t0", "f0");
+    f0.sizeBytes = 12;
+
+    SmallFile f1 = new SmallFile();
+    f1.location = new HoodieRecordLocation("t0", "f1");
+    f1.sizeBytes = 122879; // no left space to append new records to this bucket
+
+    SmallFile f2 = new SmallFile();
+    f2.location = new HoodieRecordLocation("t0", "f2");
+    f2.sizeBytes = 56;
+
+    Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+    smallFilesMap.put("par1", Arrays.asList(f0, f1));
+    smallFilesMap.put("par2", Collections.singletonList(f2));
+
+    MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig, smallFilesMap);
+    mockBucketAssigner.addUpdate("par1", "f0");
+
+    BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addUpdate("par1", "f2");
+
+    mockBucketAssigner.addInsert("par1");
+    bucketInfo = mockBucketAssigner.addInsert("par1");
+    assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+    mockBucketAssigner.addUpdate("par2", "f0");
+
+    mockBucketAssigner.addInsert("par2");
+    bucketInfo = mockBucketAssigner.addInsert("par2");
+    assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
+  }
+
+  private void assertBucketEquals(
+      BucketInfo bucketInfo,
+      String partition,
+      BucketType bucketType,
+      String fileId) {
+    BucketInfo actual = new BucketInfo(bucketType, fileId, partition);
+    assertThat(bucketInfo, is(actual));
+  }
+
+  private void assertBucketEquals(
+      BucketInfo bucketInfo,
+      String partition,
+      BucketType bucketType) {
+    assertThat(bucketInfo.getPartitionPath(), is(partition));
+    assertThat(bucketInfo.getBucketType(), is(bucketType));
+  }
+
+  /**
+   * Mock BucketAssigner that can specify small files explicitly.
+   */
+  static class MockBucketAssigner extends BucketAssigner {
+    private final Map<String, List<SmallFile>> smallFilesMap;
+
+    MockBucketAssigner(
+        HoodieFlinkEngineContext context,
+        HoodieWriteConfig config) {
+      this(context, config, Collections.emptyMap());
+    }
+
+    MockBucketAssigner(
+        HoodieFlinkEngineContext context,
+        HoodieWriteConfig config,
+        Map<String, List<SmallFile>> smallFilesMap) {
+      super(context, config);
+      this.smallFilesMap = smallFilesMap;
+    }
+
+    @Override
+    protected List<SmallFile> getSmallFiles(String partitionPath) {
+      if (this.smallFilesMap.containsKey(partitionPath)) {
+        return this.smallFilesMap.get(partitionPath);
+      }
+      return Collections.emptyList();
+    }
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
index 1b02791..59de283 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -18,7 +18,14 @@
 
 package org.apache.hudi.operator.utils;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.operator.StreamWriteFunction;
+import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.partitioner.BucketAssignFunction;
+import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -27,13 +34,9 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
-
-import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.operator.StreamWriteFunction;
-import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
-import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -43,7 +46,6 @@ import java.util.concurrent.CompletableFuture;
  * @param <I> Input type
  */
 public class StreamWriteFunctionWrapper<I> {
-  private final TypeSerializer<I> serializer;
   private final Configuration conf;
 
   private final IOManager ioManager;
@@ -52,10 +54,18 @@ public class StreamWriteFunctionWrapper<I> {
   private final StreamWriteOperatorCoordinator coordinator;
   private final MockFunctionInitializationContext functionInitializationContext;
 
-  private StreamWriteFunction<Object, I, Object> function;
+  /** Function that converts row data to HoodieRecord. */
+  private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+  /** Function that assigns bucket ID. */
+  private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
+  /** Stream write function. */
+  private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
 
-  public StreamWriteFunctionWrapper(String tablePath, TypeSerializer<I> serializer) throws Exception {
-    this.serializer = serializer;
+  public StreamWriteFunctionWrapper(String tablePath) throws Exception {
+    this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+  }
+
+  public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws Exception {
     this.ioManager = new IOManagerAsync();
     MockEnvironment environment = new MockEnvironmentBuilder()
         .setTaskName("mockTask")
@@ -64,7 +74,7 @@ public class StreamWriteFunctionWrapper<I> {
         .build();
     this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
     this.gateway = new MockOperatorEventGateway();
-    this.conf = TestConfigurations.getDefaultConf(tablePath);
+    this.conf = conf;
     // one function
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
     this.coordinator.start();
@@ -72,14 +82,37 @@ public class StreamWriteFunctionWrapper<I> {
   }
 
   public void openFunction() throws Exception {
-    function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf);
-    function.setRuntimeContext(runtimeContext);
-    function.setOperatorEventGateway(gateway);
-    function.open(this.conf);
+    toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
+    toHoodieFunction.setRuntimeContext(runtimeContext);
+    toHoodieFunction.open(conf);
+
+    bucketAssignerFunction = new BucketAssignFunction<>(conf);
+    bucketAssignerFunction.setRuntimeContext(runtimeContext);
+    bucketAssignerFunction.open(conf);
+    bucketAssignerFunction.initializeState(this.functionInitializationContext);
+
+    writeFunction = new StreamWriteFunction<>(conf);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.open(conf);
   }
 
   public void invoke(I record) throws Exception {
-    function.processElement(record, null, null);
+    HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
+    HoodieRecord<?>[] hoodieRecords = new HoodieRecord[1];
+    Collector<HoodieRecord<?>> collector = new Collector<HoodieRecord<?>>() {
+      @Override
+      public void collect(HoodieRecord<?> record) {
+        hoodieRecords[0] = record;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+    bucketAssignerFunction.processElement(hoodieRecord, null, collector);
+    writeFunction.processElement(hoodieRecords[0], null, null);
   }
 
   public BatchWriteSuccessEvent[] getEventBuffer() {
@@ -92,19 +125,22 @@ public class StreamWriteFunctionWrapper<I> {
 
   @SuppressWarnings("rawtypes")
   public HoodieFlinkWriteClient getWriteClient() {
-    return this.function.getWriteClient();
+    return this.writeFunction.getWriteClient();
   }
 
   public void checkpointFunction(long checkpointId) throws Exception {
     // checkpoint the coordinator first
     this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
-    function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+    bucketAssignerFunction.snapshotState(null);
+
+    writeFunction.snapshotState(null);
     functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
   }
 
   public void checkpointComplete(long checkpointId) {
     functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
     coordinator.checkpointComplete(checkpointId);
+    this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
   }
 
   public void checkpointFails(long checkpointId) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
index 7513fed..d9e603a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.operator.utils;
 
 import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -56,4 +57,16 @@ public class TestConfigurations {
     conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
     return conf;
   }
+
+  public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
+    FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
+    streamerConf.targetBasePath = tablePath;
+    streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_read_schema.avsc")).toString();
+    streamerConf.targetTableName = "TestHoodieTable";
+    streamerConf.partitionPathField = "partition";
+    streamerConf.tableType = "COPY_ON_WRITE";
+    streamerConf.checkpointInterval = 4000L;
+    return streamerConf;
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
index 7c2c314..b4c24ef 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 
 import static junit.framework.TestCase.assertEquals;
 import static org.hamcrest.CoreMatchers.is;
@@ -92,6 +93,13 @@ public class TestData {
           TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
   );
 
+  public static List<RowData> DATA_SET_THREE = new ArrayList<>();
+  static {
+    IntStream.range(0, 5).forEach(i -> DATA_SET_THREE.add(
+        binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
+  }
+
   /**
    * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
    *
@@ -101,13 +109,29 @@ public class TestData {
    * @param expected The expected results mapping, the key should be the partition path
    */
   public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
+    checkWrittenData(baseFile, expected, 4);
+  }
+
+  /**
+   * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param baseFile   The file base to check, should be a directly
+   * @param expected   The expected results mapping, the key should be the partition path
+   * @param partitions The expected partition number
+   */
+  public static void checkWrittenData(
+      File baseFile,
+      Map<String, String> expected,
+      int partitions) throws IOException {
     assert baseFile.isDirectory();
     FileFilter filter = file -> !file.getName().startsWith(".");
     File[] partitionDirs = baseFile.listFiles(filter);
     assertNotNull(partitionDirs);
-    assertThat(partitionDirs.length, is(4));
+    assertThat(partitionDirs.length, is(partitions));
     for (File partitionDir : partitionDirs) {
-      File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet"));
+      File[] dataFiles = partitionDir.listFiles(filter);
       assertNotNull(dataFiles);
       File latestDataFile = Arrays.stream(dataFiles)
           .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))