You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/09/02 08:32:55 UTC

[hudi] branch master updated: [HUDI-2376] Add pipeline for Append mode (#3573)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1bd22  [HUDI-2376] Add pipeline for Append mode (#3573)
7a1bd22 is described below

commit 7a1bd225cab6b1737d689d3659351a8c950a4d78
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Thu Sep 2 16:32:40 2021 +0800

    [HUDI-2376] Add pipeline for Append mode (#3573)
    
    Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   5 -
 .../apache/hudi/configuration/FlinkOptions.java    |  29 ++-
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 194 +---------------
 .../org/apache/hudi/sink/StreamWriteOperator.java  |  28 +--
 .../hudi/sink/append/AppendWriteFunction.java      | 137 +++++++++++
 .../AppendWriteOperator.java}                      |  30 ++-
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  36 +--
 .../hudi/sink/bulk/BulkInsertWriteOperator.java    |  77 +-----
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  27 ++-
 .../apache/hudi/sink/bulk/sort/SortOperator.java   |   1 -
 .../sink/common/AbstractStreamWriteFunction.java   | 258 +++++++++++++++++++++
 .../AbstractWriteFunction.java}                    |  32 ++-
 .../AbstractWriteOperator.java}                    |  36 ++-
 .../WriteOperatorFactory.java}                     |  19 +-
 .../org/apache/hudi/sink/event/CommitAckEvent.java |   3 +-
 .../partitioner/profile/DeltaWriteProfile.java     |   2 +-
 .../sink/partitioner/profile/WriteProfiles.java    |   6 +-
 .../sink/transform/RowDataToHoodieFunctions.java   |   3 +-
 .../apache/hudi/sink/transform/Transformer.java    |   1 +
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  31 ++-
 .../hudi/source/StreamReadMonitoringFunction.java  |  20 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  38 +--
 .../org/apache/hudi/table/HoodieTableFactory.java  |   4 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |   9 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |   3 +-
 .../apache/hudi/table/format/FilePathUtils.java    |  26 +--
 .../table/format/cow/AbstractColumnReader.java     |   3 +-
 .../table/format/mor/MergeOnReadInputSplit.java    |  20 +-
 .../table/format/mor/MergeOnReadTableState.java    |   1 -
 .../org/apache/hudi/util/AvroSchemaConverter.java  |   8 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |   3 +
 .../org/apache/hudi/sink/StreamWriteITCase.java    |   4 -
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  24 +-
 .../apache/hudi/sink/bulk/TestRowDataKeyGen.java   |   2 +-
 .../hudi/sink/utils/CompactFunctionWrapper.java    |  14 +-
 .../hudi/sink/utils/InsertFunctionWrapper.java     | 141 +++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |   2 +-
 .../apache/hudi/source/TestStreamReadOperator.java |   1 +
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  35 ++-
 .../org/apache/hudi/utils/TestConfigurations.java  |  12 +-
 .../test/java/org/apache/hudi/utils/TestSQL.java   |   3 +-
 .../org/apache/hudi/utils/TestStreamerUtil.java    |  16 +-
 42 files changed, 854 insertions(+), 490 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7b553bc..fdefd90 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -448,11 +448,6 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     final HoodieRecordLocation loc = record.getCurrentLocation();
     final String fileID = loc.getFileId();
     final String partitionPath = record.getPartitionPath();
-    // Always use FlinkCreateHandle when insert duplication turns on
-    if (config.allowDuplicateInserts()) {
-      return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
-          fileID, table.getTaskContextSupplier());
-    }
 
     if (bucketToHandles.containsKey(fileID)) {
       MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 0e2b0b3..8504f6a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -222,10 +222,10 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
 
   public static final ConfigOption<Boolean> INSERT_DEDUP = ConfigOptions
-          .key("write.insert.deduplicate")
-          .booleanType()
-          .defaultValue(true)
-          .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
+      .key("write.insert.deduplicate")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
 
   public static final ConfigOption<String> OPERATION = ConfigOptions
       .key("write.operation")
@@ -370,6 +370,27 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(1024)
       .withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
 
+  public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions
+      .key("write.parquet.block.size")
+      .intType()
+      .defaultValue(120)
+      .withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+          + " amortized by packing enough column values into a single row group.");
+
+  public static final ConfigOption<Integer> WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions
+      .key("write.parquet.max.file.size")
+      .intType()
+      .defaultValue(120)
+      .withDescription("Target size for parquet files produced by Hudi write phases. "
+          + "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.");
+
+  public static final ConfigOption<Integer> WRITE_PARQUET_PAGE_SIZE = ConfigOptions
+      .key("hoodie.parquet.page.size")
+      .intType()
+      .defaultValue(1)
+      .withDescription("Parquet page size. Page is the unit of read within a parquet file. "
+          + "Within a block, pages are compressed separately.");
+
   public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions
       .key("write.merge.max_memory")
       .intType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 71b20ba..a155fb5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -18,38 +18,26 @@
 
 package org.apache.hudi.sink;
 
-import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.sink.event.CommitAckEvent;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.table.action.commit.FlinkWriteHelper;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +49,6 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Random;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -101,9 +88,7 @@ import java.util.stream.Collectors;
  * @param <I> Type of the input record
  * @see StreamWriteOperatorCoordinator
  */
-public class StreamWriteFunction<K, I, O>
-    extends KeyedProcessFunction<K, I, O>
-    implements CheckpointedFunction {
+public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
 
   private static final long serialVersionUID = 1L;
 
@@ -114,76 +99,20 @@ public class StreamWriteFunction<K, I, O>
    */
   private transient Map<String, DataBucket> buckets;
 
-  /**
-   * Config options.
-   */
-  private final Configuration config;
-
-  /**
-   * Id of current subtask.
-   */
-  private int taskID;
-
-  /**
-   * Write Client.
-   */
-  private transient HoodieFlinkWriteClient writeClient;
-
   private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
 
   /**
-   * The REQUESTED instant we write the data.
-   */
-  private volatile String currentInstant;
-
-  /**
-   * Gateway to send operator events to the operator coordinator.
-   */
-  private transient OperatorEventGateway eventGateway;
-
-  /**
-   * Commit action type.
-   */
-  private transient String actionType;
-
-  /**
    * Total size tracer.
    */
   private transient TotalSizeTracer tracer;
 
   /**
-   * Flag saying whether the write task is waiting for the checkpoint success notification
-   * after it finished a checkpoint.
-   *
-   * <p>The flag is needed because the write task does not block during the waiting time interval,
-   * some data buckets still flush out with old instant time. There are two cases that the flush may produce
-   * corrupted files if the old instant is committed successfully:
-   * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
-   * 2) the write handle finished the write but was not closed, left an empty parquet file.
-   *
-   * <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
-   * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
-   * time changed(the last instant committed successfully).
-   */
-  private volatile boolean confirming = false;
-
-  /**
-   * List state of the write metadata events.
-   */
-  private transient ListState<WriteMetadataEvent> writeMetadataState;
-
-  /**
-   * Write status list for the current checkpoint.
-   */
-  private List<WriteStatus> writeStatuses;
-
-  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
    */
   public StreamWriteFunction(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   @Override
@@ -194,42 +123,15 @@ public class StreamWriteFunction<K, I, O>
   }
 
   @Override
-  public void initializeState(FunctionInitializationContext context) throws Exception {
-    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
-    this.actionType = CommitUtils.getCommitActionType(
-        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
-        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
-
-    this.writeStatuses = new ArrayList<>();
-    this.writeMetadataState = context.getOperatorStateStore().getListState(
-        new ListStateDescriptor<>(
-            "write-metadata-state",
-            TypeInformation.of(WriteMetadataEvent.class)
-        ));
-
-    this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
-    if (context.isRestored()) {
-      restoreWriteMetadata();
-    } else {
-      sendBootstrapEvent();
-    }
-    // blocks flushing until the coordinator starts a new instant
-    this.confirming = true;
-  }
-
-  @Override
-  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+  public void snapshotState() {
     // Based on the fact that the coordinator starts the checkpoint first,
     // it would check the validity.
     // wait for the buffer data flush out and request a new instant
     flushRemaining(false);
-    // Reload the snapshot state as the current state.
-    reloadWriteMetaState();
   }
 
   @Override
-  public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
+  public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
     bufferRecord((HoodieRecord<?>) value);
   }
 
@@ -264,21 +166,6 @@ public class StreamWriteFunction<K, I, O>
     return ret;
   }
 
-  @VisibleForTesting
-  @SuppressWarnings("rawtypes")
-  public HoodieFlinkWriteClient getWriteClient() {
-    return writeClient;
-  }
-
-  @VisibleForTesting
-  public boolean isConfirming() {
-    return this.confirming;
-  }
-
-  public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-    this.eventGateway = operatorEventGateway;
-  }
-
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
@@ -307,49 +194,6 @@ public class StreamWriteFunction<K, I, O>
     }
   }
 
-  private void restoreWriteMetadata() throws Exception {
-    String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
-    boolean eventSent = false;
-    for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(lastInflight, event.getInstantTime())) {
-        // The checkpoint succeed but the meta does not commit,
-        // re-commit the inflight instant
-        this.eventGateway.sendEventToCoordinator(event);
-        LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
-        eventSent = true;
-      }
-    }
-    if (!eventSent) {
-      sendBootstrapEvent();
-    }
-  }
-
-  private void sendBootstrapEvent() {
-    this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
-    LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
-  }
-
-  /**
-   * Reload the write metadata state as the current checkpoint.
-   */
-  private void reloadWriteMetaState() throws Exception {
-    this.writeMetadataState.clear();
-    WriteMetadataEvent event = WriteMetadataEvent.builder()
-        .taskID(taskID)
-        .instantTime(currentInstant)
-        .writeStatus(new ArrayList<>(writeStatuses))
-        .bootstrap(true)
-        .build();
-    this.writeMetadataState.add(event);
-    writeStatuses.clear();
-  }
-
-  public void handleOperatorEvent(OperatorEvent event) {
-    ValidationUtils.checkArgument(event instanceof CommitAckEvent,
-        "The write function can only handle CommitAckEvent");
-    this.confirming = false;
-  }
-
   /**
    * Represents a data item in the buffer, this is needed to reduce the
    * memory footprint.
@@ -562,32 +406,6 @@ public class StreamWriteFunction<K, I, O>
         && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
   }
 
-  private String instantToWrite(boolean hasData) {
-    String instant = this.writeClient.getLastPendingInstant(this.actionType);
-    // if exactly-once semantics turns on,
-    // waits for the checkpoint notification until the checkpoint timeout threshold hits.
-    TimeWait timeWait = TimeWait.builder()
-        .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
-        .action("instant initialize")
-        .build();
-    while (confirming) {
-      // wait condition:
-      // 1. there is no inflight instant
-      // 2. the inflight instant does not change and the checkpoint has buffering data
-      if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
-        // sleep for a while
-        timeWait.waitFor();
-        // refresh the inflight instant
-        instant = this.writeClient.getLastPendingInstant(this.actionType);
-      } else {
-        // the inflight instant changed, which means the last instant was committed
-        // successfully.
-        confirming = false;
-      }
-    }
-    return instant;
-  }
-
   @SuppressWarnings("unchecked, rawtypes")
   private boolean flushBucket(DataBucket bucket) {
     String instant = instantToWrite(true);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
index c16743e..9e39e3f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
@@ -18,12 +18,10 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamSink;
 
 /**
@@ -31,27 +29,13 @@ import org.apache.flink.streaming.api.operators.StreamSink;
  *
  * @param <I> The input type
  */
-public class StreamWriteOperator<I>
-    extends KeyedProcessOperator<Object, I, Object>
-    implements OperatorEventHandler, BoundedOneInput {
-  private final StreamWriteFunction<Object, I, Object> sinkFunction;
+public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {
 
   public StreamWriteOperator(Configuration conf) {
     super(new StreamWriteFunction<>(conf));
-    this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
-  }
-
-  @Override
-  public void handleOperatorEvent(OperatorEvent event) {
-    this.sinkFunction.handleOperatorEvent(event);
-  }
-
-  void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-    sinkFunction.setOperatorEventGateway(operatorEventGateway);
   }
 
-  @Override
-  public void endInput() {
-    sinkFunction.endInput();
+  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
+    return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
new file mode 100644
index 0000000..128c030
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.append;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function writes base files directly for each checkpoint,
+ * the file may roll over when it’s size hits the configured threshold.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Helper class for log mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Constructs an AppendWriteFunction.
+   *
+   * @param config The config options
+   */
+  public AppendWriteFunction(Configuration config, RowType rowType) {
+    super(config);
+    this.rowType = rowType;
+  }
+
+  @Override
+  public void snapshotState() {
+    // Based on the fact that the coordinator starts the checkpoint first,
+    // it would check the validity.
+    // wait for the buffer data flush out and request a new instant
+    flushData(false);
+    // nullify the write helper for next ckp
+    this.writerHelper = null;
+  }
+
+  @Override
+  public void processElement(I value, Context ctx, Collector<Object> out) throws Exception {
+    if (this.writerHelper == null) {
+      initWriterHelper();
+    }
+    this.writerHelper.write((RowData) value);
+  }
+
+  @Override
+  public void close() {
+    if (this.writeClient != null) {
+      this.writeClient.cleanHandlesGracefully();
+      this.writeClient.close();
+    }
+  }
+
+  /**
+   * End input action for batch source.
+   */
+  public void endInput() {
+    flushData(true);
+    this.writeClient.cleanHandles();
+    this.writeStatuses.clear();
+  }
+
+  // -------------------------------------------------------------------------
+  //  GetterSetter
+  // -------------------------------------------------------------------------
+  @VisibleForTesting
+  public BulkInsertWriterHelper getWriterHelper() {
+    return this.writerHelper;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  private void initWriterHelper() {
+    this.currentInstant = instantToWrite(true);
+    if (this.currentInstant == null) {
+      // in case there are empty checkpoints that has no input data
+      throw new HoodieException("No inflight instant when flushing data!");
+    }
+    this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+        this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+        this.rowType);
+  }
+
+  private void flushData(boolean endInput) {
+    final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+    final WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .instantTime(this.writerHelper.getInstantTime())
+        .writeStatus(writeStatus)
+        .lastBatch(true)
+        .endInput(endInput)
+        .build();
+    this.eventGateway.sendEventToCoordinator(event);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
similarity index 54%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
index 5b811a8..ad1a002 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteOperator.java
@@ -16,30 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink.transform;
+package org.apache.hudi.sink.append;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
- * Utilities for {@link RowDataToHoodieFunction}.
+ * Operator for {@link AppendWriteFunction}.
+ *
+ * @param <I> The input type
  */
-public abstract class RowDataToHoodieFunctions {
-  private RowDataToHoodieFunctions() {}
+public class AppendWriteOperator<I> extends AbstractWriteOperator<I> {
+
+  public AppendWriteOperator(Configuration conf, RowType rowType) {
+    super(new AppendWriteFunction<>(conf, rowType));
+  }
 
-  /**
-   * Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.
-   */
-  @SuppressWarnings("rawtypes")
-  public static RowDataToHoodieFunction<RowData, HoodieRecord> create(RowType rowType, Configuration conf) {
-    if (conf.getLong(FlinkOptions.WRITE_RATE_LIMIT) > 0) {
-      return new RowDataToHoodieFunctionWithRateLimit<>(rowType, conf);
-    } else {
-      return new RowDataToHoodieFunction<>(rowType, conf);
-    }
+  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
+    return WriteOperatorFactory.instance(conf, new AppendWriteOperator<>(conf, rowType));
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index dd0f7bc..7fce5c0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -19,21 +19,20 @@
 package org.apache.hudi.sink.bulk;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
@@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Sink function to write the data to the underneath filesystem.
@@ -55,8 +53,8 @@ import java.util.stream.Collectors;
  * @param <I> Type of the input record
  * @see StreamWriteOperatorCoordinator
  */
-public class BulkInsertWriteFunction<I, O>
-    extends ProcessFunction<I, O> {
+public class BulkInsertWriteFunction<I>
+    extends AbstractWriteFunction<I> {
 
   private static final long serialVersionUID = 1L;
 
@@ -126,7 +124,7 @@ public class BulkInsertWriteFunction<I, O>
   }
 
   @Override
-  public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
+  public void processElement(I value, Context ctx, Collector<Object> out) throws IOException {
     this.writerHelper.write((RowData) value);
   }
 
@@ -142,14 +140,8 @@ public class BulkInsertWriteFunction<I, O>
    * End input action for batch source.
    */
   public void endInput() {
-    final List<WriteStatus> writeStatus;
-    try {
-      this.writerHelper.close();
-      writeStatus = this.writerHelper.getWriteStatuses().stream()
-          .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
-    }
+    final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .instantTime(this.writerHelper.getInstantTime())
@@ -160,17 +152,9 @@ public class BulkInsertWriteFunction<I, O>
     this.eventGateway.sendEventToCoordinator(event);
   }
 
-  /**
-   * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
-   */
-  private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
-    WriteStatus writeStatus = new WriteStatus(false, 0.1);
-    writeStatus.setStat(internalWriteStatus.getStat());
-    writeStatus.setFileId(internalWriteStatus.getFileId());
-    writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
-    writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
-    writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
-    return writeStatus;
+  @Override
+  public void handleOperatorEvent(OperatorEvent event) {
+    // no operation
   }
 
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
index bc01622..16fb87f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
@@ -18,24 +18,12 @@
 
 package org.apache.hudi.sink.bulk;
 
-import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
-import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
@@ -44,13 +32,11 @@ import org.apache.flink.table.types.logical.RowType;
  * @param <I> The input type
  */
 public class BulkInsertWriteOperator<I>
-    extends ProcessOperator<I, Object>
-    implements OperatorEventHandler, BoundedOneInput {
-  private final BulkInsertWriteFunction<I, Object> sinkFunction;
+    extends AbstractWriteOperator<I>
+    implements BoundedOneInput {
 
   public BulkInsertWriteOperator(Configuration conf, RowType rowType) {
     super(new BulkInsertWriteFunction<>(conf, rowType));
-    this.sinkFunction = (BulkInsertWriteFunction<I, Object>) getUserFunction();
   }
 
   @Override
@@ -58,58 +44,7 @@ public class BulkInsertWriteOperator<I>
     // no operation
   }
 
-  void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-    sinkFunction.setOperatorEventGateway(operatorEventGateway);
-  }
-
-  @Override
-  public void endInput() {
-    sinkFunction.endInput();
-  }
-
-  public static OperatorFactory<RowData> getFactory(Configuration conf, RowType rowType) {
-    return new OperatorFactory<>(conf, rowType);
-  }
-
-  // -------------------------------------------------------------------------
-  //  Inner Class
-  // -------------------------------------------------------------------------
-
-  public static class OperatorFactory<I>
-      extends SimpleUdfStreamOperatorFactory<Object>
-      implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
-    private static final long serialVersionUID = 1L;
-
-    private final BulkInsertWriteOperator<I> operator;
-    private final Configuration conf;
-
-    public OperatorFactory(Configuration conf, RowType rowType) {
-      super(new BulkInsertWriteOperator<>(conf, rowType));
-      this.operator = (BulkInsertWriteOperator<I>) getOperator();
-      this.conf = conf;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
-      final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
-      final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
-
-      this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
-      this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
-      this.operator.setProcessingTimeService(this.processingTimeService);
-      eventDispatcher.registerEventHandler(operatorID, operator);
-      return (T) operator;
-    }
-
-    @Override
-    public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
-      return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
-    }
-
-    @Override
-    public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
-      super.setProcessingTimeService(processingTimeService);
-    }
+  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
+    return WriteOperatorFactory.instance(conf, new BulkInsertWriteOperator<>(conf, rowType));
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index fbe7678..e0cbab6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -19,9 +19,11 @@
 package org.apache.hudi.sink.bulk;
 
 import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.table.HoodieTable;
 
@@ -39,6 +41,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * Helper class for bulk insert used by Flink.
@@ -101,7 +104,7 @@ public class BulkInsertWriterHelper {
     }
   }
 
-  public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException {
+  public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws IOException {
     close();
     return writeStatusList;
   }
@@ -172,5 +175,27 @@ public class BulkInsertWriterHelper {
 
     return new RowType(false, mergedFields);
   }
+
+  public List<WriteStatus> getWriteStatuses(int taskID) {
+    try {
+      return getHoodieWriteStatuses().stream()
+          .map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new HoodieException("Error collect the write status for task [" + taskID + "]");
+    }
+  }
+
+  /**
+   * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
+   */
+  private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
+    WriteStatus writeStatus = new WriteStatus(false, 0.1);
+    writeStatus.setStat(internalWriteStatus.getStat());
+    writeStatus.setFileId(internalWriteStatus.getFileId());
+    writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
+    writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
+    writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
+    return writeStatus;
+  }
 }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
index 7be2b41..aa62240 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.StreamRecordCollector;
 import org.apache.flink.util.MutableObjectIterator;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
new file mode 100644
index 0000000..654f0b8
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.common;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.CommitAckEvent;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base infrastructures for streaming writer function.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public abstract class AbstractStreamWriteFunction<I>
+    extends AbstractWriteFunction<I>
+    implements CheckpointedFunction {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
+
+  /**
+   * Config options.
+   */
+  protected final Configuration config;
+
+  /**
+   * Id of current subtask.
+   */
+  protected int taskID;
+
+  /**
+   * Write Client.
+   */
+  protected transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The REQUESTED instant we write the data.
+   */
+  protected volatile String currentInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  protected transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  protected transient String actionType;
+
+  /**
+   * Flag saying whether the write task is waiting for the checkpoint success notification
+   * after it finished a checkpoint.
+   *
+   * <p>The flag is needed because the write task does not block during the waiting time interval,
+   * some data buckets still flush out with old instant time. There are two cases that the flush may produce
+   * corrupted files if the old instant is committed successfully:
+   * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
+   * 2) the write handle finished the write but was not closed, left an empty parquet file.
+   *
+   * <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
+   * the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
+   * time changed(the last instant committed successfully).
+   */
+  protected volatile boolean confirming = false;
+
+  /**
+   * List state of the write metadata events.
+   */
+  private transient ListState<WriteMetadataEvent> writeMetadataState;
+
+  /**
+   * Write status list for the current checkpoint.
+   */
+  protected List<WriteStatus> writeStatuses;
+
+  /**
+   * Constructs a StreamWriteFunctionBase.
+   *
+   * @param config The config options
+   */
+  public AbstractStreamWriteFunction(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws Exception {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
+    this.actionType = CommitUtils.getCommitActionType(
+        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
+
+    this.writeStatuses = new ArrayList<>();
+    this.writeMetadataState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "write-metadata-state",
+            TypeInformation.of(WriteMetadataEvent.class)
+        ));
+
+    this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
+    if (context.isRestored()) {
+      restoreWriteMetadata();
+    } else {
+      sendBootstrapEvent();
+    }
+    // blocks flushing until the coordinator starts a new instant
+    this.confirming = true;
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+    snapshotState();
+    // Reload the snapshot state as the current state.
+    reloadWriteMetaState();
+  }
+
+  public abstract void snapshotState();
+
+  // -------------------------------------------------------------------------
+  //  Getter/Setter
+  // -------------------------------------------------------------------------
+  @VisibleForTesting
+  @SuppressWarnings("rawtypes")
+  public HoodieFlinkWriteClient getWriteClient() {
+    return writeClient;
+  }
+
+  @VisibleForTesting
+  public boolean isConfirming() {
+    return this.confirming;
+  }
+
+  public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+    this.eventGateway = operatorEventGateway;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void restoreWriteMetadata() throws Exception {
+    String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
+    boolean eventSent = false;
+    for (WriteMetadataEvent event : this.writeMetadataState.get()) {
+      if (Objects.equals(lastInflight, event.getInstantTime())) {
+        // The checkpoint succeed but the meta does not commit,
+        // re-commit the inflight instant
+        this.eventGateway.sendEventToCoordinator(event);
+        LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
+        eventSent = true;
+      }
+    }
+    if (!eventSent) {
+      sendBootstrapEvent();
+    }
+  }
+
+  private void sendBootstrapEvent() {
+    this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
+    LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
+  }
+
+  /**
+   * Reload the write metadata state as the current checkpoint.
+   */
+  private void reloadWriteMetaState() throws Exception {
+    this.writeMetadataState.clear();
+    WriteMetadataEvent event = WriteMetadataEvent.builder()
+        .taskID(taskID)
+        .instantTime(currentInstant)
+        .writeStatus(new ArrayList<>(writeStatuses))
+        .bootstrap(true)
+        .build();
+    this.writeMetadataState.add(event);
+    writeStatuses.clear();
+  }
+
+  public void handleOperatorEvent(OperatorEvent event) {
+    ValidationUtils.checkArgument(event instanceof CommitAckEvent,
+        "The write function can only handle CommitAckEvent");
+    this.confirming = false;
+  }
+
+  /**
+   * Prepares the instant time to write with for next checkpoint.
+   *
+   * @param hasData Whether the task has buffering data
+   * @return The instant time
+   */
+  protected String instantToWrite(boolean hasData) {
+    String instant = this.writeClient.getLastPendingInstant(this.actionType);
+    // if exactly-once semantics turns on,
+    // waits for the checkpoint notification until the checkpoint timeout threshold hits.
+    TimeWait timeWait = TimeWait.builder()
+        .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+        .action("instant initialize")
+        .build();
+    while (confirming) {
+      // wait condition:
+      // 1. there is no inflight instant
+      // 2. the inflight instant does not change and the checkpoint has buffering data
+      if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
+        // sleep for a while
+        timeWait.waitFor();
+        // refresh the inflight instant
+        instant = this.writeClient.getLastPendingInstant(this.actionType);
+      } else {
+        // the inflight instant changed, which means the last instant was committed
+        // successfully.
+        confirming = false;
+      }
+    }
+    return instant;
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
similarity index 52%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
index 93f74af..8e77600 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
@@ -16,22 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink.event;
+package org.apache.hudi.sink.common;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 
 /**
- * An operator event to mark successful instant commit.
+ * Base class for write function.
+ *
+ * @param <I> the input type
  */
-public class CommitAckEvent implements OperatorEvent {
-  private static final long serialVersionUID = 1L;
-
-  private static final CommitAckEvent INSTANCE = new CommitAckEvent();
+public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object> implements BoundedOneInput {
+  /**
+   * Sets up the event gateway.
+   */
+  public abstract void setOperatorEventGateway(OperatorEventGateway operatorEventGateway);
 
-  // default constructor for efficient serialization
-  public CommitAckEvent() {}
+  /**
+   * Invoked when bounded source ends up.
+   */
+  public abstract void endInput();
 
-  public static CommitAckEvent getInstance() {
-    return INSTANCE;
-  }
+  /**
+   * Handles the operator event sent by the coordinator.
+   * @param event The event
+   */
+  public abstract void handleOperatorEvent(OperatorEvent event);
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
similarity index 58%
copy from hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
copy to hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
index c16743e..e339ccb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
@@ -16,42 +16,40 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.common;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 
 /**
- * Operator for {@link StreamSink}.
+ * Base class for write operator.
  *
- * @param <I> The input type
+ * @param <I> the input type
  */
-public class StreamWriteOperator<I>
-    extends KeyedProcessOperator<Object, I, Object>
+public abstract class AbstractWriteOperator<I>
+    extends ProcessOperator<I, Object>
     implements OperatorEventHandler, BoundedOneInput {
-  private final StreamWriteFunction<Object, I, Object> sinkFunction;
+  private final AbstractWriteFunction<I> function;
 
-  public StreamWriteOperator(Configuration conf) {
-    super(new StreamWriteFunction<>(conf));
-    this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
+  public AbstractWriteOperator(AbstractWriteFunction<I> function) {
+    super(function);
+    this.function = function;
   }
 
-  @Override
-  public void handleOperatorEvent(OperatorEvent event) {
-    this.sinkFunction.handleOperatorEvent(event);
+  public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+    this.function.setOperatorEventGateway(operatorEventGateway);
   }
 
-  void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-    sinkFunction.setOperatorEventGateway(operatorEventGateway);
+  @Override
+  public void endInput() {
+    this.function.endInput();
   }
 
   @Override
-  public void endInput() {
-    sinkFunction.endInput();
+  public void handleOperatorEvent(OperatorEvent evt) {
+    this.function.handleOperatorEvent(evt);
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
similarity index 83%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
index ce89886..01a28de 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/WriteOperatorFactory.java
@@ -16,7 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.sink;
+package org.apache.hudi.sink.common;
+
+import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,20 +34,24 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 /**
  * Factory class for {@link StreamWriteOperator}.
  */
-public class StreamWriteOperatorFactory<I>
+public class WriteOperatorFactory<I>
     extends SimpleUdfStreamOperatorFactory<Object>
     implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
   private static final long serialVersionUID = 1L;
 
-  private final StreamWriteOperator<I> operator;
+  private final AbstractWriteOperator<I> operator;
   private final Configuration conf;
 
-  public StreamWriteOperatorFactory(Configuration conf) {
-    super(new StreamWriteOperator<>(conf));
-    this.operator = (StreamWriteOperator<I>) getOperator();
+  public WriteOperatorFactory(Configuration conf, AbstractWriteOperator<I> operator) {
+    super(operator);
+    this.operator = operator;
     this.conf = conf;
   }
 
+  public static <I> WriteOperatorFactory<I> instance(Configuration conf, AbstractWriteOperator<I> operator) {
+    return new WriteOperatorFactory<>(conf, operator);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
index 93f74af..541fd06 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java
@@ -29,7 +29,8 @@ public class CommitAckEvent implements OperatorEvent {
   private static final CommitAckEvent INSTANCE = new CommitAckEvent();
 
   // default constructor for efficient serialization
-  public CommitAckEvent() {}
+  public CommitAckEvent() {
+  }
 
   public static CommitAckEvent getInstance() {
     return INSTANCE;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index 9f56bdd..6b5e96e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -61,7 +61,7 @@ public class DeltaWriteProfile extends WriteProfile {
       // If we can index log files, we can add more inserts to log files for fileIds including those under
       // pending compaction.
       List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
-              .collect(Collectors.toList());
+          .collect(Collectors.toList());
       for (FileSlice fileSlice : allFileSlices) {
         if (isSmallFile(fileSlice)) {
           allSmallFileSlices.add(fileSlice);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 1fcf3f1..0ab8f12 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -52,7 +52,8 @@ public class WriteProfiles {
 
   private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
 
-  private WriteProfiles() {}
+  private WriteProfiles() {
+  }
 
   public static synchronized WriteProfile singleton(
       boolean ignoreSmallFiles,
@@ -104,7 +105,6 @@ public class WriteProfiles {
    * @param basePath Table base path
    * @param metadata The metadata
    * @param fs       The filesystem
-   *
    * @return the commit file status list
    */
   private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
@@ -143,7 +143,6 @@ public class WriteProfiles {
    * @param basePath  The table base path
    * @param instant   The hoodie instant
    * @param timeline  The timeline
-   *
    * @return the commit metadata or empty if any error occurs
    */
   public static Option<HoodieCommitMetadata> getCommitMetadataSafely(
@@ -172,7 +171,6 @@ public class WriteProfiles {
    * @param basePath  The table base path
    * @param instant   The hoodie instant
    * @param timeline  The timeline
-   *
    * @return the commit metadata
    */
   public static HoodieCommitMetadata getCommitMetadata(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
index 5b811a8..0007fd1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java
@@ -29,7 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
  * Utilities for {@link RowDataToHoodieFunction}.
  */
 public abstract class RowDataToHoodieFunctions {
-  private RowDataToHoodieFunctions() {}
+  private RowDataToHoodieFunctions() {
+  }
 
   /**
    * Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
index f40a838..282cca7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/Transformer.java
@@ -28,6 +28,7 @@ public interface Transformer {
 
   /**
    * Transform source DataStream to target DataStream.
+   *
    * @param source
    */
   DataStream<RowData> apply(DataStream<RowData> source);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index f31645c..1211188 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -21,12 +21,14 @@ package org.apache.hudi.sink.utils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.sink.StreamWriteOperatorFactory;
+import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.append.AppendWriteOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
 import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
 import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -41,6 +43,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -52,7 +55,7 @@ import org.apache.flink.table.types.logical.RowType;
 public class Pipelines {
 
   public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
-    BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
+    WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
 
     final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
     if (partitionFields.length > 0) {
@@ -80,9 +83,17 @@ public class Pipelines {
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
-        .addSink(new CleanFunction<>(conf))
-        .setParallelism(1)
-        .name("clean_commits");
+        .addSink(DummySink.INSTANCE);
+  }
+
+  public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
+    WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
+
+    return dataStream
+        .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
+        .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
+        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+        .addSink(DummySink.INSTANCE);
   }
 
   public static DataStream<HoodieRecord> bootstrap(
@@ -143,7 +154,7 @@ public class Pipelines {
   }
 
   public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
-    StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
+    WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
     return dataStream
         // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
         .keyBy(HoodieRecord::getRecordKey)
@@ -180,4 +191,12 @@ public class Pipelines {
         .setParallelism(1)
         .name("clean_commits");
   }
+
+  /**
+   * Dummy sink that does nothing.
+   */
+  public static class DummySink implements SinkFunction<Object> {
+    private static final long serialVersionUID = 1L;
+    public static DummySink INSTANCE = new DummySink();
+  }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ae6b3e1..112dfda 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -270,16 +270,16 @@ public class StreamReadMonitoringFunction
     final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
     List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
         .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
-        .map(fileSlice -> {
-          Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
-              .sorted(HoodieLogFile.getLogFileComparator())
-              .map(logFile -> logFile.getPath().toString())
-              .collect(Collectors.toList()));
-          String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-          return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-              basePath, logPaths, commitToIssue,
-              metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
-        }).collect(Collectors.toList()))
+            .map(fileSlice -> {
+              Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+                  .sorted(HoodieLogFile.getLogFileComparator())
+                  .map(logFile -> logFile.getPath().toString())
+                  .collect(Collectors.toList()));
+              String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+              return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+                  basePath, logPaths, commitToIssue,
+                  metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
+            }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 75272ab..3dc8cb2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -142,22 +142,22 @@ public class FlinkStreamerConfig extends Configuration {
   public Integer writeTaskNum = 4;
 
   @Parameter(names = {"--partition-default-name"},
-          description = "The default partition name in case the dynamic partition column value is null/empty string")
+      description = "The default partition name in case the dynamic partition column value is null/empty string")
   public String partitionDefaultName = "__DEFAULT_PARTITION__";
 
   @Parameter(names = {"--index-bootstrap-enabled"},
-          description = "Whether to bootstrap the index state from existing hoodie table, default false")
+      description = "Whether to bootstrap the index state from existing hoodie table, default false")
   public Boolean indexBootstrapEnabled = false;
 
   @Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
   public Double indexStateTtl = 1.5D;
 
   @Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
-          + "if same key record with different partition path came in, default false")
+      + "if same key record with different partition path came in, default false")
   public Boolean indexGlobalEnabled = false;
 
   @Parameter(names = {"--index-partition-regex"},
-          description = "Whether to load partitions in state if partition path matching, default *")
+      description = "Whether to load partitions in state if partition path matching, default *")
   public String indexPartitionRegex = ".*";
 
   @Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
@@ -167,8 +167,8 @@ public class FlinkStreamerConfig extends Configuration {
   public String sourceAvroSchema = "";
 
   @Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
-          + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
-          + " use UTC timezone, by default true")
+      + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+      + " use UTC timezone, by default true")
   public Boolean utcTimezone = true;
 
   @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
@@ -180,18 +180,18 @@ public class FlinkStreamerConfig extends Configuration {
   public Boolean hiveStylePartitioning = false;
 
   @Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
-          + "it flushes the max size data bucket to avoid OOM, default 1GB")
+      + "it flushes the max size data bucket to avoid OOM, default 1GB")
   public Double writeTaskMaxSize = 1024D;
 
   @Parameter(names = {"--write-batch-size"},
-          description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
+      description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
   public Double writeBatchSize = 64D;
 
   @Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
   public Integer writeLogBlockSize = 128;
 
   @Parameter(names = {"--write-log-max-size"},
-          description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
+      description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
   public Integer writeLogMaxSize = 1024;
 
   @Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB")
@@ -204,11 +204,11 @@ public class FlinkStreamerConfig extends Configuration {
   public Integer compactionTasks = 10;
 
   @Parameter(names = {"--compaction-trigger-strategy"},
-          description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
-                  + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
-                  + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
-                  + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
-                  + "Default is 'num_commits'")
+      description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+          + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+          + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+          + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+          + "Default is 'num_commits'")
   public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
 
   @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
@@ -227,16 +227,16 @@ public class FlinkStreamerConfig extends Configuration {
   public Boolean cleanAsyncEnabled = true;
 
   @Parameter(names = {"--clean-retain-commits"},
-          description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+      description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
           + "This also directly translates into how much you can incrementally pull on this table, default 10")
   public Integer cleanRetainCommits = 10;
 
   @Parameter(names = {"--archive-max-commits"},
-          description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
+      description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
   public Integer archiveMaxCommits = 30;
 
   @Parameter(names = {"--archive-min-commits"},
-          description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
+      description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
   public Integer archiveMinCommits = 20;
 
   @Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false")
@@ -270,7 +270,7 @@ public class FlinkStreamerConfig extends Configuration {
   public String hiveSyncPartitionFields = "";
 
   @Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
-          + "default 'SlashEncodedDayPartitionValueExtractor'")
+      + "default 'SlashEncodedDayPartitionValueExtractor'")
   public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
 
   @Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
@@ -289,7 +289,7 @@ public class FlinkStreamerConfig extends Configuration {
   public Boolean hiveSyncSkipRoSuffix = false;
 
   @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
-          + "Disabled by default for backward compatibility.")
+      + "Disabled by default for backward compatibility.")
   public Boolean hiveSyncSupportTimestamp = false;
 
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 89f7ed7..0344857 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -105,7 +105,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
   /**
    * The sanity check.
    *
-   * @param conf The table options
+   * @param conf   The table options
    * @param schema The table schema
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
@@ -217,7 +217,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
 
   /**
    * Sets up the hive options from the table definition.
-   * */
+   */
   private static void setupHiveOptions(Configuration conf) {
     if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
         && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 2ced22a..2fdd0fd 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -77,10 +77,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
 
       // default parallelism
       int parallelism = dataStream.getExecutionConfig().getParallelism();
+
+      DataStream<Object> pipeline;
+      // Append mode
+      if (StreamerUtil.allowDuplicateInserts(conf)) {
+        return Pipelines.append(conf, rowType, dataStream);
+      }
+
       // bootstrap
       final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
       // write pipeline
-      DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
+      pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
       // compaction
       if (StreamerUtil.needsAsyncCompaction(conf)) {
         return Pipelines.compact(conf, pipeline);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 52cb765..78d1db6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -303,7 +303,8 @@ public class HoodieTableSource implements
                 .collect(Collectors.toList()));
             return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
                 metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
-          }).collect(Collectors.toList()); })
+          }).collect(Collectors.toList());
+    })
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
index 8f1347f..1eb7e2d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
@@ -283,11 +283,11 @@ public class FilePathUtils {
    *
    * <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
    *
-   * @param path The base path
-   * @param hadoopConf The hadoop configuration
-   * @param partitionKeys The partition key list
+   * @param path           The base path
+   * @param hadoopConf     The hadoop configuration
+   * @param partitionKeys  The partition key list
    * @param defaultParName The default partition name for nulls
-   * @param hivePartition Whether the partition path is in Hive style
+   * @param hivePartition  Whether the partition path is in Hive style
    */
   public static List<Map<String, String>> getPartitions(
       Path path,
@@ -338,9 +338,9 @@ public class FilePathUtils {
   /**
    * Returns all the file paths that is the parents of the data files.
    *
-   * @param path The base path
-   * @param conf The Flink configuration
-   * @param hadoopConf The hadoop configuration
+   * @param path          The base path
+   * @param conf          The Flink configuration
+   * @param hadoopConf    The hadoop configuration
    * @param partitionKeys The partition key list
    */
   public static Path[] getReadPaths(
@@ -362,11 +362,10 @@ public class FilePathUtils {
   /**
    * Transforms the given partition key value mapping to read paths.
    *
-   * @param path The base path
-   * @param partitionKeys The partition key list
+   * @param path           The base path
+   * @param partitionKeys  The partition key list
    * @param partitionPaths The partition key value mapping
-   * @param hivePartition Whether the partition path is in Hive style
-   *
+   * @param hivePartition  Whether the partition path is in Hive style
    * @see #getReadPaths
    */
   public static Path[] partitionPath2ReadPath(
@@ -384,10 +383,9 @@ public class FilePathUtils {
   /**
    * Transforms the given partition key value mapping to relative partition paths.
    *
-   * @param partitionKeys The partition key list
+   * @param partitionKeys  The partition key list
    * @param partitionPaths The partition key value mapping
-   * @param hivePartition Whether the partition path is in Hive style
-   *
+   * @param hivePartition  Whether the partition path is in Hive style
    * @see #getReadPaths
    */
   public static Set<String> toRelativePartitionPaths(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
index e6f40a5..efbe914 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/AbstractColumnReader.java
@@ -296,7 +296,8 @@ public abstract class AbstractColumnReader<V extends WritableColumnVector>
   /**
    * After read a page, we may need some initialization.
    */
-  protected void afterReadPage() {}
+  protected void afterReadPage() {
+  }
 
   /**
    * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index b929e7b..0c93eea 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -97,15 +97,15 @@ public class MergeOnReadInputSplit implements InputSplit {
   @Override
   public String toString() {
     return "MergeOnReadInputSplit{"
-            + "splitNum=" + splitNum
-            + ", basePath=" + basePath
-            + ", logPaths=" + logPaths
-            + ", latestCommit='" + latestCommit + '\''
-            + ", tablePath='" + tablePath + '\''
-            + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
-            + ", mergeType='" + mergeType + '\''
-            + ", instantRange=" + instantRange
-            + '}';
+        + "splitNum=" + splitNum
+        + ", basePath=" + basePath
+        + ", logPaths=" + logPaths
+        + ", latestCommit='" + latestCommit + '\''
+        + ", tablePath='" + tablePath + '\''
+        + ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
+        + ", mergeType='" + mergeType + '\''
+        + ", instantRange=" + instantRange
+        + '}';
   }
-  
+
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 0a63c91..36dfecb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -106,7 +106,6 @@ public class MergeOnReadTableState implements Serializable {
    *
    * @param pkOffsets the pk offsets in required row type
    * @return pk field logical types
-   *
    * @see #getPkOffsetsInRequired()
    */
   public LogicalType[] getPkTypes(int[] pkOffsets) {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index db72cc1..df815a8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -162,7 +162,7 @@ public class AvroSchemaConverter {
    * <p>Use "record" as the type name.
    *
    * @param schema the schema type, usually it should be the top level record type, e.g. not a
-   *     nested type
+   *               nested type
    * @return Avro's {@link Schema} matching this logical type.
    */
   public static Schema convertToSchema(LogicalType schema) {
@@ -176,7 +176,7 @@ public class AvroSchemaConverter {
    * schema. Nested record type that only differs with type name is still compatible.
    *
    * @param logicalType logical type
-   * @param rowName the record name
+   * @param rowName     the record name
    * @return Avro's {@link Schema} matching this logical type.
    */
   public static Schema convertToSchema(LogicalType logicalType, String rowName) {
@@ -315,7 +315,9 @@ public class AvroSchemaConverter {
     return valueType;
   }
 
-  /** Returns schema with nullable true. */
+  /**
+   * Returns schema with nullable true.
+   */
   private static Schema nullableSchema(Schema schema) {
     return schema.isNullable()
         ? schema
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 9f625ba..b95c9e1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -181,6 +181,9 @@ public class StreamerUtil {
             .withStorageConfig(HoodieStorageConfig.newBuilder()
                 .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
                 .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
+                .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024)
+                .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024)
+                .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L)
                 .build())
             .withMetadataConfig(HoodieMetadataConfig.newBuilder()
                 .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 659e022..1890d07 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -245,8 +245,6 @@ public class StreamWriteITCase extends TestLogger {
     RowType rowType =
         (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
-    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf);
 
     JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
         rowType,
@@ -302,8 +300,6 @@ public class StreamWriteITCase extends TestLogger {
     RowType rowType =
         (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
-    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf);
 
     JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
         rowType,
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 0c52241..b403f3c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -23,13 +23,13 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.InsertFunctionWrapper;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -58,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -532,11 +533,7 @@ public class TestWriteCopyOnWrite {
 
   @Test
   public void testInsertAllowsDuplication() throws Exception {
-    // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
-    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
-    conf.setBoolean(FlinkOptions.INSERT_DEDUP, false);
-    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
+    InsertFunctionWrapper<RowData> funcWrapper = new InsertFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
@@ -547,19 +544,16 @@ public class TestWriteCopyOnWrite {
 
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
-    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
-    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+    assertNull(funcWrapper.getWriterHelper());
 
     final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event2 = funcWrapper.getNextEvent();
-    assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
+    assertThat("The operator expect to send an event", event1, instanceOf(WriteMetadataEvent.class));
 
     funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
     assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
 
     String instant = funcWrapper.getWriteClient()
-            .getLastPendingInstant(getTableType());
+        .getLastPendingInstant(getTableType());
 
     funcWrapper.checkpointComplete(1);
 
@@ -585,10 +579,8 @@ public class TestWriteCopyOnWrite {
 
     funcWrapper.checkpointFunction(2);
 
-    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first
-    final OperatorEvent event4 = funcWrapper.getNextEvent();
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+    final OperatorEvent event2 = funcWrapper.getNextEvent(); // remove the first event first
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
     funcWrapper.checkpointComplete(2);
 
     // same with the original base file content.
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index 32ee725..7a1eaee 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -77,7 +77,7 @@ public class TestRowDataKeyGen {
     assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001"));
 
     // null record key and partition path
-    final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null);
+    final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, null, 23, null, null);
     assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
     assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
     // empty record key and partition path
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index d53d58e..fe2ddad 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -53,11 +53,17 @@ public class CompactFunctionWrapper {
   private final IOManager ioManager;
   private final StreamingRuntimeContext runtimeContext;
 
-  /** Function that generates the {@link HoodieCompactionPlan}. */
+  /**
+   * Function that generates the {@link HoodieCompactionPlan}.
+   */
   private CompactionPlanOperator compactionPlanOperator;
-  /** Function that executes the compaction task. */
+  /**
+   * Function that executes the compaction task.
+   */
   private CompactFunction compactFunction;
-  /** Stream sink to handle compaction commits. */
+  /**
+   * Stream sink to handle compaction commits.
+   */
   private CompactionCommitSink commitSink;
 
   public CompactFunctionWrapper(Configuration conf) throws Exception {
@@ -120,7 +126,7 @@ public class CompactFunctionWrapper {
     compactionPlanOperator.notifyCheckpointComplete(checkpointID);
     // collect the CompactCommitEvents
     List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
-    for (CompactionPlanEvent event: events) {
+    for (CompactionPlanEvent event : events) {
       compactFunction.processElement(event, null, new Collector<CompactionCommitEvent>() {
         @Override
         public void collect(CompactionCommitEvent event) {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
new file mode 100644
index 0000000..ed23754
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.append.AppendWriteFunction;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link AppendWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class InsertFunctionWrapper<I> {
+  private final Configuration conf;
+  private final RowType rowType;
+
+  private final StreamingRuntimeContext runtimeContext;
+  private final MockOperatorEventGateway gateway;
+  private final MockOperatorCoordinatorContext coordinatorContext;
+  private final StreamWriteOperatorCoordinator coordinator;
+  private final MockStateInitializationContext stateInitializationContext;
+
+  /**
+   * Append write function.
+   */
+  private AppendWriteFunction<RowData> writeFunction;
+
+  public InsertFunctionWrapper(String tablePath, Configuration conf) {
+    IOManager ioManager = new IOManagerAsync();
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+    this.gateway = new MockOperatorEventGateway();
+    this.conf = conf;
+    this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+    // one function
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
+    this.stateInitializationContext = new MockStateInitializationContext();
+  }
+
+  public void openFunction() throws Exception {
+    this.coordinator.start();
+    this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
+
+    setupWriteFunction();
+  }
+
+  public void invoke(I record) throws Exception {
+    writeFunction.processElement((RowData) record, null, null);
+  }
+
+  public WriteMetadataEvent[] getEventBuffer() {
+    return this.coordinator.getEventBuffer();
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.gateway.getNextEvent();
+  }
+
+  @SuppressWarnings("rawtypes")
+  public HoodieFlinkWriteClient getWriteClient() {
+    return this.writeFunction.getWriteClient();
+  }
+
+  public void checkpointFunction(long checkpointId) throws Exception {
+    // checkpoint the coordinator first
+    this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+
+    writeFunction.snapshotState(null);
+    stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+  }
+
+  public void checkpointComplete(long checkpointId) {
+    stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+    coordinator.notifyCheckpointComplete(checkpointId);
+  }
+
+  public StreamWriteOperatorCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  public BulkInsertWriterHelper getWriterHelper() {
+    return this.writeFunction.getWriterHelper();
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void setupWriteFunction() throws Exception {
+    writeFunction = new AppendWriteFunction<>(conf, rowType);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.initializeState(this.stateInitializationContext);
+    writeFunction.open(conf);
+
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 4ada517..6b6bede 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -95,7 +95,7 @@ public class StreamWriteFunctionWrapper<I> {
   /**
    * Stream write function.
    */
-  private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
+  private StreamWriteFunction<HoodieRecord<?>> writeFunction;
 
   private CompactFunctionWrapper compactFunctionWrapper;
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 8096d5e..233e6fa 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -68,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class TestStreamReadOperator {
   private static final Map<String, String> EXPECTED = new HashMap<>();
+
   static {
     EXPECTED.put("par1", "+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], +I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1]");
     EXPECTED.put("par2", "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], +I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2]");
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index f8fc42a..a04e7bb 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -796,6 +796,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
         + "+I[id1, Sophia, 18, 1970-01-01T00:00:05, par5]]", 3);
   }
 
+  @Test
+  void testAppendWrite() {
+    TableEnvironment tableEnv = batchTableEnv;
+    // csv source
+    String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
+    tableEnv.executeSql(csvSourceDDL);
+
+    String hoodieTableDDL = sql("hoodie_sink")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, "insert")
+        .option(FlinkOptions.INSERT_DEDUP, false)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    String insertInto = "insert into hoodie_sink select * from csv_source";
+    execInsertSql(tableEnv, insertInto);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+    // apply filters
+    List<Row> result2 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect());
+    assertRowsEquals(result2, "["
+        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
+        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
+        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
@@ -874,7 +903,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   }
 
   private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
-          throws InterruptedException {
+      throws InterruptedException {
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     tEnv.executeSql(sinkDDL);
     TableResult tableResult = tEnv.executeSql("insert into sink " + select);
@@ -883,7 +912,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     tableResult.getJobClient().ifPresent(JobClient::cancel);
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toList());
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 6824090..231d42c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -128,18 +128,18 @@ public class TestConfigurations {
     DataType[] fieldTypes = tableSchema.getFieldDataTypes();
     for (int i = 0; i < fieldNames.length; i++) {
       builder.append("  `")
-              .append(fieldNames[i])
-              .append("` ")
-              .append(fieldTypes[i].toString());
+          .append(fieldNames[i])
+          .append("` ")
+          .append(fieldTypes[i].toString());
       if (i != fieldNames.length - 1) {
         builder.append(",");
       }
       builder.append("\n");
     }
     final String withProps = ""
-            + ") with (\n"
-            + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
-            + ")";
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+        + ")";
     builder.append(withProps);
     return builder.toString();
   }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
index 6b0b71c..8822a6f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java
@@ -22,7 +22,8 @@ package org.apache.hudi.utils;
  * Test sql statements.
  */
 public class TestSQL {
-  private TestSQL() {}
+  private TestSQL() {
+  }
 
   public static final String INSERT_T1 = "insert into t1 values\n"
       + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 7459f3a..af40a8d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -57,12 +57,12 @@ public class TestStreamerUtil {
 
     // Validate the partition fields & preCombineField in hoodie.properties.
     HoodieTableMetaClient metaClient1 = HoodieTableMetaClient.builder()
-            .setBasePath(tempFile.getAbsolutePath())
-            .setConf(new org.apache.hadoop.conf.Configuration())
-            .build();
+        .setBasePath(tempFile.getAbsolutePath())
+        .setConf(new org.apache.hadoop.conf.Configuration())
+        .build();
     assertTrue(metaClient1.getTableConfig().getPartitionFields().isPresent(),
-            "Missing partition columns in the hoodie.properties.");
-    assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] { "p0", "p1" });
+        "Missing partition columns in the hoodie.properties.");
+    assertArrayEquals(metaClient1.getTableConfig().getPartitionFields().get(), new String[] {"p0", "p1"});
     assertEquals(metaClient1.getTableConfig().getPreCombineField(), "ts");
 
     // Test for non-partitioned table.
@@ -70,9 +70,9 @@ public class TestStreamerUtil {
     FileIOUtils.deleteDirectory(tempFile);
     StreamerUtil.initTableIfNotExists(conf);
     HoodieTableMetaClient metaClient2 = HoodieTableMetaClient.builder()
-            .setBasePath(tempFile.getAbsolutePath())
-            .setConf(new org.apache.hadoop.conf.Configuration())
-            .build();
+        .setBasePath(tempFile.getAbsolutePath())
+        .setConf(new org.apache.hadoop.conf.Configuration())
+        .build();
     assertFalse(metaClient2.getTableConfig().getPartitionFields().isPresent());
   }