You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/28 00:53:25 UTC

[hudi] branch master updated: [HUDI-1522] Add a new pipeline for Flink writer (#2430)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc0325f  [HUDI-1522] Add a new pipeline for Flink writer (#2430)
bc0325f is described below

commit bc0325f6ea0a734f106f21a2fcd4ead413a6cf7b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Jan 28 08:53:13 2021 +0800

    [HUDI-1522] Add a new pipeline for Flink writer (#2430)
    
    * [HUDI-1522] Add a new pipeline for Flink writer
---
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  34 ++
 .../hudi/index/state/FlinkInMemoryStateIndex.java  |   4 +-
 .../src/main/avro/HoodieRollbackMetadata.avsc      |   4 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  11 +-
 .../model/OverwriteWithLatestAvroPayload.java      |   9 +-
 .../common/testutils/HoodieTestDataGenerator.java  |   4 +-
 hudi-flink/pom.xml                                 | 103 +++--
 .../java/org/apache/hudi/HoodieFlinkStreamer.java  | 198 ----------
 .../org/apache/hudi/operator/FlinkOptions.java     | 249 ++++++++++++
 .../hudi/operator/InstantGenerateOperator.java     |  10 +-
 .../hudi/operator/KeyedWriteProcessFunction.java   |   6 +-
 .../apache/hudi/operator/StreamWriteFunction.java  | 313 +++++++++++++++
 .../apache/hudi/operator/StreamWriteOperator.java  |  52 +++
 .../operator/StreamWriteOperatorCoordinator.java   | 419 +++++++++++++++++++++
 .../hudi/operator/StreamWriteOperatorFactory.java  |  77 ++++
 .../operator/event/BatchWriteSuccessEvent.java     |  57 +++
 .../hudi/schema/FilebasedSchemaProvider.java       |  17 +-
 .../org/apache/hudi/schema/SchemaProvider.java     |   8 -
 .../main/java/org/apache/hudi/sink/CommitSink.java |   6 +-
 .../JsonStringToHoodieRecordMapFunction.java       |   4 +-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  | 124 ++++++
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  | 105 ++++++
 .../hudi/streamer/HoodieFlinkStreamerV2.java       | 102 +++++
 .../OperationConverter.java}                       |  30 +-
 .../org/apache/hudi/util/AvroSchemaConverter.java  | 147 ++++++++
 .../apache/hudi/util/RowDataToAvroConverters.java  | 309 +++++++++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    | 167 ++++++--
 .../hudi/operator/StreamWriteFunctionTest.java     | 303 +++++++++++++++
 .../apache/hudi/operator/StreamWriteITCase.java    | 129 +++++++
 .../StreamWriteOperatorCoordinatorTest.java        | 101 +++++
 .../utils/MockFunctionInitializationContext.java   |  48 +++
 .../apache/hudi/operator/utils/MockMapState.java   |  90 +++++
 .../operator/utils/MockOperatorStateStore.java     | 141 +++++++
 .../utils/MockStreamingRuntimeContext.java         | 124 ++++++
 .../operator/utils/StreamWriteFunctionWrapper.java | 122 ++++++
 .../hudi/operator/utils/TestConfigurations.java    |  59 +++
 .../org/apache/hudi/operator/utils/TestData.java   | 164 ++++++++
 .../TestJsonStringToHoodieRecordMapFunction.java   |   2 +-
 .../resources/test_read_schema.avsc}               |  55 +--
 hudi-flink/src/test/resources/test_source.data     |   8 +
 40 files changed, 3569 insertions(+), 346 deletions(-)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index d845b90..e3e0eb4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -25,12 +25,14 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -46,6 +48,7 @@ import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -218,4 +221,35 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     return unCompletedTimeline.getInstants().filter(x -> x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
   }
+
+  public String getInflightAndRequestedInstant(String tableType) {
+    final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+    HoodieTimeline unCompletedTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
+    return unCompletedTimeline.getInstants()
+        .filter(x -> x.getAction().equals(commitType))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList()).stream()
+        .max(Comparator.naturalOrder())
+        .orElse(null);
+  }
+
+  public String getLastCompletedInstant(String tableType) {
+    final String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+    HoodieTimeline completedTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+    return completedTimeline.getInstants()
+        .filter(x -> x.getAction().equals(commitType))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList()).stream()
+        .max(Comparator.naturalOrder())
+        .orElse(null);
+  }
+
+  public void deletePendingInstant(String tableType, String instant) {
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+    String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
+    table.getMetaClient().getActiveTimeline()
+        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 4354ea3..44eafd5 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -54,9 +54,7 @@ public class FlinkInMemoryStateIndex<T extends HoodieRecordPayload> extends Flin
     if (context.getRuntimeContext() != null) {
       MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
           new MapStateDescriptor<>("indexState", TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class));
-      if (context.getRuntimeContext() != null) {
-        mapState = context.getRuntimeContext().getMapState(indexStateDesc);
-      }
+      mapState = context.getRuntimeContext().getMapState(indexStateDesc);
     }
   }
 
diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
index a972bfd..f1c9fd5 100644
--- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
@@ -57,10 +57,10 @@
      /* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */
      {
        "name": "instantsRollback",
-       "default": null,
+       "default": [],
        "type": {
           "type": "array",
-          "default": null,
+          "default": [],
           "items": "HoodieInstantInfo"
        }
      }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ec19fe3..3b989db 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -217,7 +217,7 @@ public class HoodieAvroUtils {
 
   private static Schema initRecordKeySchema() {
     Schema.Field recordKeyField =
-        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
     Schema recordKeySchema = Schema.createRecord("HoodieRecordKey", "", "", false);
     recordKeySchema.setFields(Collections.singletonList(recordKeyField));
     return recordKeySchema;
@@ -263,9 +263,9 @@ public class HoodieAvroUtils {
    */
   public static Schema appendNullSchemaFields(Schema schema, List<String> newFieldNames) {
     List<Field> newFields = schema.getFields().stream()
-        .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList());
+        .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList());
     for (String newField : newFieldNames) {
-      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()));
+      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE));
     }
     Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
     newSchema.setFields(newFields);
@@ -329,7 +329,8 @@ public class HoodieAvroUtils {
 
   private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
     // cache the result of oldRecord.get() to save CPU expensive hash lookup
-    Object fieldValue = oldRecord.get(f.name());
+    Schema oldSchema = oldRecord.getSchema();
+    Object fieldValue = oldSchema.getField(f.name()) == null ? null : oldRecord.get(f.name());
     if (fieldValue == null) {
       if (f.defaultVal() instanceof JsonProperties.Null) {
         newRecord.put(f.name(), null);
@@ -381,7 +382,7 @@ public class HoodieAvroUtils {
         throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
             + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
       } else {
-        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
+        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
       }
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index e1e6124..dd1853d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -79,7 +79,14 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
   protected boolean isDeleteRecord(GenericRecord genericRecord) {
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    final String isDeleteKey = "_hoodie_is_deleted";
+    // Modify to be compatible with new version Avro.
+    // The new version Avro throws for GenericRecord.get if the field name
+    // does not exist in the schema.
+    if (genericRecord.getSchema().getField(isDeleteKey) == null) {
+      return false;
+    }
+    Object deleteMarker = genericRecord.get(isDeleteKey);
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 17e93fe..8017bc3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -98,8 +98,8 @@ public class HoodieTestDataGenerator {
       + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
   public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
       + "{\"name\": \"currency\", \"type\": \"string\"},";
-  public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": "
-      + "\"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": ["
+  public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": "
+      + "\"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": ["
       + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
   public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
   public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"},"
diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml
index 3d100c7..2a0f395 100644
--- a/hudi-flink/pom.xml
+++ b/hudi-flink/pom.xml
@@ -123,28 +123,77 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-avro</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-json</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-common</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Hadoop -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-auth</artifactId>
       <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Avro -->
     <dependency>
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
+      <!-- Override the version to be same with Flink avro -->
+      <version>1.10.0</version>
       <scope>compile</scope>
     </dependency>
 
@@ -160,6 +209,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -173,7 +228,9 @@
       <version>0.9.7</version>
     </dependency>
 
-    <!-- Junit Test Suite -->
+    <!-- test dependencies -->
+
+    <!-- Junit 5 dependencies -->
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
@@ -194,28 +251,7 @@
       <artifactId>junit-jupiter-params</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-junit-jupiter</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.platform</groupId>
-      <artifactId>junit-platform-runner</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.platform</groupId>
-      <artifactId>junit-platform-suite-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.platform</groupId>
-      <artifactId>junit-platform-commons</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Hoodie - Test -->
+    <!-- Hoodie dependencies -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-common</artifactId>
@@ -240,14 +276,7 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hudi</groupId>
-      <artifactId>hudi-hadoop-mr</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Flink - Tests -->
+    <!-- Flink dependencies -->
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
@@ -259,15 +288,21 @@
       <artifactId>flink-runtime_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>test</scope>
-      <classifier>tests</classifier>
+      <type>test-jar</type>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>test</scope>
-      <classifier>tests</classifier>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
     </dependency>
-
   </dependencies>
 </project>
diff --git a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
deleted file mode 100644
index b65b43c..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.operator.InstantGenerateOperator;
-import org.apache.hudi.operator.KeyedWriteProcessFunction;
-import org.apache.hudi.operator.KeyedWriteProcessOperator;
-import org.apache.hudi.sink.CommitSink;
-import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
-import org.apache.hudi.util.StreamerUtil;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * An Utility which can incrementally consume data from Kafka and apply it to the target table.
- * currently, it only support COW table and insert, upsert operation.
- */
-public class HoodieFlinkStreamer {
-  public static void main(String[] args) throws Exception {
-    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-    final Config cfg = new Config();
-    JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
-      cmd.usage();
-      System.exit(1);
-    }
-    env.enableCheckpointing(cfg.checkpointInterval);
-    env.getConfig().setGlobalJobParameters(cfg);
-    // We use checkpoint to trigger write operation, including instant generating and committing,
-    // There can only be one checkpoint at one time.
-    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-    env.disableOperatorChaining();
-
-    if (cfg.flinkCheckPointPath != null) {
-      env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
-    }
-
-    TypedProperties props = StreamerUtil.getProps(cfg);
-
-    // add kafka config
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
-
-    // add data source config
-    props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
-    props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
-
-    // Read from kafka source
-    DataStream<HoodieRecord> inputRecords =
-        env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
-            .filter(Objects::nonNull)
-            .map(new JsonStringToHoodieRecordMapFunction(props))
-            .name("kafka_to_hudi_record")
-            .uid("kafka_to_hudi_record_uid");
-
-    // InstantGenerateOperator helps to emit globally unique instantTime
-    inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
-        .name("instant_generator")
-        .uid("instant_generator_id")
-
-        // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
-        .keyBy(HoodieRecord::getPartitionPath)
-
-        // write operator, where the write operation really happens
-        .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
-        }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
-        .name("write_process")
-        .uid("write_process_uid")
-        .setParallelism(env.getParallelism())
-
-        // Commit can only be executed once, so make it one parallelism
-        .addSink(new CommitSink())
-        .name("commit_sink")
-        .uid("commit_sink_uid")
-        .setParallelism(1);
-
-    env.execute(cfg.targetTableName);
-  }
-
-  public static class Config extends Configuration {
-    @Parameter(names = {"--kafka-topic"}, description = "kafka topic", required = true)
-    public String kafkaTopic;
-
-    @Parameter(names = {"--kafka-group-id"}, description = "kafka consumer group id", required = true)
-    public String kafkaGroupId;
-
-    @Parameter(names = {"--kafka-bootstrap-servers"}, description = "kafka bootstrap.servers", required = true)
-    public String kafkaBootstrapServers;
-
-    @Parameter(names = {"--flink-checkpoint-path"}, description = "flink checkpoint path")
-    public String flinkCheckPointPath;
-
-    @Parameter(names = {"--flink-block-retry-times"}, description = "Times to retry when latest instant has not completed")
-    public String blockRetryTime = "10";
-
-    @Parameter(names = {"--flink-block-retry-interval"}, description = "Seconds between two tries when latest instant has not completed")
-    public String blockRetryInterval = "1";
-
-    @Parameter(names = {"--target-base-path"},
-        description = "base path for the target hoodie table. "
-            + "(Will be created if did not exist first time around. If exists, expected to be a hoodie table)",
-        required = true)
-    public String targetBasePath;
-
-    @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
-    public String targetTableName;
-
-    @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
-    public String tableType;
-
-    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
-        + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
-        + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
-        + "to individual classes, for supported properties.")
-    public String propsFilePath =
-        "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
-
-    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
-        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
-    public List<String> configs = new ArrayList<>();
-
-    @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
-        + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
-    public String sourceOrderingField = "ts";
-
-    @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off "
-        + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
-    public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
-
-    @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
-        + "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
-    public WriteOperationType operation = WriteOperationType.UPSERT;
-
-    @Parameter(names = {"--filter-dupes"},
-        description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
-    public Boolean filterDupes = false;
-
-    @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
-    public Boolean commitOnErrors = false;
-
-    /**
-     * Flink checkpoint interval.
-     */
-    @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
-    public Long checkpointInterval = 1000 * 5L;
-
-    @Parameter(names = {"--help", "-h"}, help = true)
-    public Boolean help = false;
-  }
-
-  private static class OperationConverter implements IStringConverter<WriteOperationType> {
-
-    @Override
-    public WriteOperationType convert(String value) throws ParameterException {
-      return WriteOperationType.valueOf(value);
-    }
-  }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
new file mode 100644
index 0000000..655fd1a
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Flink config options.
+ *
+ * <p>It has the options for Hoodie table read and write. It also defines some utilities.
+ */
+public class FlinkOptions {
+  private FlinkOptions() {
+  }
+
+  // ------------------------------------------------------------------------
+  //  Base Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> PATH = ConfigOptions
+      .key("path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Base path for the target hoodie table."
+          + "\nThe path would be created if it does not exist,\n"
+          + "otherwise a Hoodie table expects to be initialized successfully");
+
+  // ------------------------------------------------------------------------
+  //  Read Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = ConfigOptions
+      .key("read.schema.file.path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Avro schema file path, the parsed schema is used for deserializing");
+
+  // ------------------------------------------------------------------------
+  //  Write Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+      .key(HoodieWriteConfig.TABLE_NAME)
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Table name to register to Hive metastore");
+
+  public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
+      .key("write.table.type")
+      .stringType()
+      .defaultValue(HoodieTableType.COPY_ON_WRITE.name())
+      .withDescription("Type of table to write, COPY_ON_WRITE (or) MERGE_ON_READ");
+
+  public static final ConfigOption<String> OPERATION = ConfigOptions
+      .key("write.operation")
+      .stringType()
+      .defaultValue("upsert")
+      .withDescription("The write operation, that this write should do");
+
+  public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
+      .key("write.precombine.field")
+      .stringType()
+      .defaultValue("ts")
+      .withDescription("Field used in preCombining before actual write. When two records have the same\n"
+          + "key value, we will pick the one with the largest value for the precombine field,\n"
+          + "determined by Object.compareTo(..)");
+
+  public static final ConfigOption<String> PAYLOAD_CLASS = ConfigOptions
+      .key("write.payload.class")
+      .stringType()
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+          + "This will render any value set for the option in-effective");
+
+  /**
+   * Flag to indicate whether to drop duplicates upon insert.
+   * By default insert will accept duplicates, to gain extra performance.
+   */
+  public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
+      .key("write.insert.drop.duplicates")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Flag to indicate whether to drop duplicates upon insert.\n"
+          + "By default insert will accept duplicates, to gain extra performance");
+
+  public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
+      .key("write.retry.times")
+      .intType()
+      .defaultValue(3)
+      .withDescription("Flag to indicate how many times streaming job should retry for a failed checkpoint batch.\n"
+          + "By default 3");
+
+  public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
+      .key("write.retry.interval.ms")
+      .longType()
+      .defaultValue(2000L)
+      .withDescription("Flag to indicate how long (by millisecond) before a retry should issued for failed checkpoint batch.\n"
+          + "By default 2000 and it will be doubled by every retry");
+
+  public static final ConfigOption<Boolean> IGNORE_FAILED = ConfigOptions
+      .key("write.ignore.failed")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch.\n"
+          + "By default true (in favor of streaming progressing over data integrity)");
+
+  public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
+      .key(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
+      .stringType()
+      .defaultValue("uuid")
+      .withDescription("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+          + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+          + "the dot notation eg: `a.b.c`");
+
+  public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
+      .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
+      .stringType()
+      .defaultValue("partition-path")
+      .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n"
+          + "Actual value obtained by invoking .toString()");
+
+  public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
+      .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP)
+      .stringType()
+      .defaultValue(SimpleAvroKeyGenerator.class.getName())
+      .withDescription("Key generator class, that implements will extract the key out of incoming record");
+
+  public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = ConfigOptions
+      .key("write.task.parallelism")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do actual write, default is 4");
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  // Remember to update the set when adding new options.
+  public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
+      TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, INSERT_DROP_DUPS, RETRY_TIMES,
+      RETRY_INTERVAL_MS, IGNORE_FAILED, RECORD_KEY_FIELD, PARTITION_PATH_FIELD, KEYGEN_CLASS
+  );
+
+  // Prefix for Hoodie specific properties.
+  private static final String PROPERTIES_PREFIX = "properties.";
+
+  /**
+   * Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}.
+   * The latter is more suitable for the table APIs. It reads all the properties
+   * in the properties file (set by `--props` option) and cmd line options
+   * (set by `--hoodie-conf` option).
+   */
+  @SuppressWarnings("unchecked, rawtypes")
+  public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) {
+    Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config));
+    org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
+
+    conf.setString(FlinkOptions.PATH, config.targetBasePath);
+    conf.setString(READ_SCHEMA_FILE_PATH, config.readSchemaFilePath);
+    conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
+    // copy_on_write works same as COPY_ON_WRITE
+    conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
+    conf.setString(FlinkOptions.OPERATION, config.operation.value());
+    conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
+    conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName);
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
+    conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
+    conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
+    conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField);
+    conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
+    conf.setInteger(FlinkOptions.WRITE_TASK_PARALLELISM, config.writeTaskNum);
+
+    return conf;
+  }
+
+  /**
+   * Collects the config options that start with 'properties.' into a 'key'='value' list.
+   */
+  public static Map<String, String> getHoodieProperties(Map<String, String> options) {
+    final Map<String, String> hoodieProperties = new HashMap<>();
+
+    if (hasPropertyOptions(options)) {
+      options.keySet().stream()
+          .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+          .forEach(key -> {
+            final String value = options.get(key);
+            final String subKey = key.substring((PROPERTIES_PREFIX).length());
+            hoodieProperties.put(subKey, value);
+          });
+    }
+    return hoodieProperties;
+  }
+
+  /**
+   * Collects all the config options, the 'properties.' prefix would be removed if the option key starts with it.
+   */
+  public static Configuration flatOptions(Configuration conf) {
+    final Map<String, String> propsMap = new HashMap<>();
+
+    conf.toMap().forEach((key, value) -> {
+      final String subKey = key.startsWith(PROPERTIES_PREFIX)
+          ? key.substring((PROPERTIES_PREFIX).length())
+          : key;
+      propsMap.put(subKey, value);
+    });
+    return fromMap(propsMap);
+  }
+
+  private static boolean hasPropertyOptions(Map<String, String> options) {
+    return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
+  }
+
+  /** Creates a new configuration that is initialized with the options of the given map. */
+  private static Configuration fromMap(Map<String, String> map) {
+    final Configuration configuration = new Configuration();
+    map.forEach(configuration::setString);
+    return configuration;
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 103aef5..5c9930d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -66,7 +66,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
 
-  private HoodieFlinkStreamer.Config cfg;
+  private FlinkStreamerConfig cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
@@ -94,13 +94,13 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   public void open() throws Exception {
     super.open();
     // get configs from runtimeContext
-    cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
+    cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     // retry times
-    retryTimes = Integer.valueOf(cfg.blockRetryTime);
+    retryTimes = Integer.valueOf(cfg.instantRetryTimes);
 
     // retry interval
-    retryInterval = Integer.valueOf(cfg.blockRetryInterval);
+    retryInterval = Integer.valueOf(cfg.instantRetryInterval);
 
     // hadoopConf
     serializableHadoopConf = new SerializableConfiguration(StreamerUtil.getHadoopConf());
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index d3ebddf..a59a995 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
   /**
    * Job conf.
    */
-  private HoodieFlinkStreamer.Config cfg;
+  private FlinkStreamerConfig cfg;
 
   /**
    * Write Client.
@@ -90,7 +90,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
 
     indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
 
-    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     HoodieFlinkEngineContext context =
         new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
new file mode 100644
index 0000000..34a61d4
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p><h2>Work Flow</h2>
+ *
+ * <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
+ * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully,
+ * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
+ *
+ * <p><h2>Exactly-once Semantics</h2>
+ *
+ * <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator
+ * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
+ * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
+ * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer.
+ * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint.
+ * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics.
+ *
+ * <p><h2>Fault Tolerance</h2>
+ *
+ * <p>The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back
+ * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover.
+ * The operator coordinator would try several times when committing the writestatus.
+ *
+ * <p>Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks
+ * write to the same file group that conflict. The general case for partition path is a datetime field,
+ * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the
+ * data by the file group IDs.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
+
+  /**
+   * Write buffer for a checkpoint.
+   */
+  private transient List<HoodieRecord> buffer;
+
+  /**
+   * The buffer lock to control data buffering/flushing.
+   */
+  private transient ReentrantLock bufferLock;
+
+  /**
+   * The condition to decide whether to add new records into the buffer.
+   */
+  private transient Condition addToBufferCondition;
+
+  /**
+   * Flag saying whether there is an on-going checkpoint.
+   */
+  private volatile boolean onCheckpointing = false;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
+
+  /**
+   * HoodieKey generator.
+   */
+  private transient KeyGenerator keyGenerator;
+
+  /**
+   * Row type of the input.
+   */
+  private final RowType rowType;
+
+  /**
+   * Avro schema of the input.
+   */
+  private final Schema avroSchema;
+
+  private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
+
+  /**
+   * The REQUESTED instant we write the data.
+   */
+  private volatile String currentInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param rowType The input row type
+   * @param config  The config options
+   */
+  public StreamWriteFunction(RowType rowType, Configuration config) {
+    this.rowType = rowType;
+    this.avroSchema = StreamerUtil.getSourceSchema(config);
+    this.config = config;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+    this.converter = RowDataToAvroConverters.createConverter(this.rowType);
+    initBuffer();
+    initWriteClient();
+    initWriteFunction();
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    // no operation
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
+    bufferLock.lock();
+    try {
+      // Based on the fact that the coordinator starts the checkpoint first,
+      // it would check the validity.
+      this.onCheckpointing = true;
+      this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+      Preconditions.checkNotNull(this.currentInstant,
+          "No inflight instant when flushing data");
+      // wait for the buffer data flush out and request a new instant
+      flushBuffer();
+      // signal the task thread to start buffering
+      addToBufferCondition.signal();
+    } finally {
+      this.onCheckpointing = false;
+      bufferLock.unlock();
+    }
+  }
+
+  @Override
+  public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
+    bufferLock.lock();
+    try {
+      if (onCheckpointing) {
+        addToBufferCondition.await();
+      }
+      this.buffer.add(toHoodieRecord(value));
+    } finally {
+      bufferLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.writeClient != null) {
+      this.writeClient.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Getter/Setter
+  // -------------------------------------------------------------------------
+
+  @VisibleForTesting
+  @SuppressWarnings("rawtypes")
+  public List<HoodieRecord> getBuffer() {
+    return buffer;
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("rawtypes")
+  public HoodieFlinkWriteClient getWriteClient() {
+    return writeClient;
+  }
+
+  public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+    this.eventGateway = operatorEventGateway;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void initBuffer() {
+    this.buffer = new ArrayList<>();
+    this.bufferLock = new ReentrantLock();
+    this.addToBufferCondition = this.bufferLock.newCondition();
+  }
+
+  private void initWriteClient() {
+    HoodieFlinkEngineContext context =
+        new HoodieFlinkEngineContext(
+            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+            new FlinkTaskContextSupplier(getRuntimeContext()));
+
+    writeClient = new HoodieFlinkWriteClient<>(context, StreamerUtil.getHoodieClientConfig(this.config));
+  }
+
+  private void initWriteFunction() {
+    final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+    switch (WriteOperationType.fromValue(writeOperation)) {
+      case INSERT:
+        this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
+        break;
+      case UPSERT:
+        this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
+        break;
+      default:
+        throw new RuntimeException("Unsupported write operation : " + writeOperation);
+    }
+  }
+
+  /**
+   * Converts the give record to a {@link HoodieRecord}.
+   *
+   * @param record The input record
+   * @return HoodieRecord based on the configuration
+   * @throws IOException if error occurs
+   */
+  @SuppressWarnings("rawtypes")
+  private HoodieRecord toHoodieRecord(I record) throws IOException {
+    boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+        || WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
+    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
+    final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
+    Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
+        this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
+    HoodieRecordPayload payload = shouldCombine
+        ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
+        : StreamerUtil.createPayload(payloadClazz, gr);
+    return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
+  }
+
+  private void flushBuffer() {
+    final List<WriteStatus> writeStatus;
+    if (buffer.size() > 0) {
+      writeStatus = writeFunction.apply(buffer, currentInstant);
+      buffer.clear();
+    } else {
+      LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
+      writeStatus = Collections.emptyList();
+    }
+    this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus));
+    this.currentInstant = "";
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
new file mode 100644
index 0000000..3f4d940
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Operator for {@link StreamSink}.
+ *
+ * @param <I> The input type
+ */
+public class StreamWriteOperator<I>
+    extends KeyedProcessOperator<Object, I, Object>
+    implements OperatorEventHandler {
+  private final StreamWriteFunction<Object, I, Object> sinkFunction;
+
+  public StreamWriteOperator(RowType rowType, Configuration conf) {
+    super(new StreamWriteFunction<>(rowType, conf));
+    this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
+  }
+
+  @Override
+  public void handleOperatorEvent(OperatorEvent operatorEvent) {
+    // do nothing
+  }
+
+  void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+    sinkFunction.setOperatorEventGateway(operatorEventGateway);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
new file mode 100644
index 0000000..524c601
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * {@link OperatorCoordinator} for {@link StreamWriteFunction}.
+ *
+ * <p>This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the
+ * operator tasks write the buffer successfully for a round of checkpoint.
+ *
+ * <p>If there is no data for a round of checkpointing, it rolls back the metadata.
+ *
+ * @see StreamWriteFunction for the work flow and semantics
+ */
+public class StreamWriteOperatorCoordinator
+    implements OperatorCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);
+
+  /**
+   * Config options.
+   */
+  private final Configuration conf;
+
+  /**
+   * Write client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  private long inFlightCheckpoint = -1;
+
+  /**
+   * Current REQUESTED instant, for validation.
+   */
+  private String inFlightInstant = "";
+
+  /**
+   * Event buffer for one round of checkpointing. When all the elements are non-null and have the same
+   * write instant, then the instant succeed and we can commit it.
+   */
+  private transient BatchWriteSuccessEvent[] eventBuffer;
+
+  /**
+   * Task number of the operator.
+   */
+  private final int parallelism;
+
+  /**
+   * Constructs a StreamingSinkOperatorCoordinator.
+   *
+   * @param conf        The config options
+   * @param parallelism The operator task number
+   */
+  public StreamWriteOperatorCoordinator(
+      Configuration conf,
+      int parallelism) {
+    this.conf = conf;
+    this.parallelism = parallelism;
+  }
+
+  @Override
+  public void start() throws Exception {
+    // initialize event buffer
+    reset();
+    // writeClient
+    initWriteClient();
+    // init table, create it if not exists.
+    initTable();
+  }
+
+  @Override
+  public void close() {
+    if (writeClient != null) {
+      writeClient.close();
+    }
+    this.eventBuffer = null;
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
+    try {
+      final String errMsg = "A new checkpoint starts while the last checkpoint buffer"
+          + " data has not finish writing, roll back the last write and throw";
+      checkAndForceCommit(errMsg);
+      this.inFlightInstant = this.writeClient.startCommit();
+      this.inFlightCheckpoint = checkpointId;
+      LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
+      result.complete(writeCheckpointBytes());
+    } catch (Throwable throwable) {
+      // when a checkpoint fails, throws directly.
+      result.completeExceptionally(
+          new CompletionException(
+              String.format("Failed to checkpoint Instant %s for source %s",
+                  this.inFlightInstant, this.getClass().getSimpleName()), throwable));
+    }
+  }
+
+  @Override
+  public void checkpointComplete(long checkpointId) {
+    // start to commit the instant.
+    checkAndCommitWithRetry();
+  }
+
+  public void notifyCheckpointAborted(long checkpointId) {
+    Preconditions.checkState(inFlightCheckpoint == checkpointId,
+        "The aborted checkpoint should always be the last checkpoint");
+    checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw");
+  }
+
+  @Override
+  public void resetToCheckpoint(@Nullable byte[] checkpointData) throws Exception {
+    if (checkpointData != null) {
+      // restore when any checkpoint completed
+      deserializeCheckpointAndRestore(checkpointData);
+    }
+  }
+
+  @Override
+  public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
+    // no event to handle
+    Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
+        "The coordinator can only handle BatchWriteSuccessEvent");
+    BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
+    Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant),
+        String.format("Receive an unexpected event for instant %s from task %d",
+            event.getInstantTime(), event.getTaskID()));
+    this.eventBuffer[event.getTaskID()] = event;
+  }
+
+  @Override
+  public void subtaskFailed(int i, @Nullable Throwable throwable) {
+    // no operation
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private void initWriteClient() {
+    writeClient = new HoodieFlinkWriteClient(
+        new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
+        StreamerUtil.getHoodieClientConfig(this.conf),
+        true);
+  }
+
+  private void initTable() throws IOException {
+    final String basePath = this.conf.getString(FlinkOptions.PATH);
+    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+    // Hadoop FileSystem
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
+        HoodieTableMetaClient.initTableType(
+            hadoopConf,
+            basePath,
+            HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
+            this.conf.getString(FlinkOptions.TABLE_NAME),
+            "archived",
+            this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
+            1);
+        LOG.info("Table initialized");
+      } else {
+        LOG.info("Table [{}/{}] already exists, no need to initialize the table",
+            basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
+      }
+    }
+  }
+
+  static byte[] readBytes(DataInputStream in, int size) throws IOException {
+    byte[] bytes = new byte[size];
+    in.readFully(bytes);
+    return bytes;
+  }
+
+  /**
+   * Serialize the coordinator state. The current implementation may not be super efficient,
+   * but it should not matter that much because most of the state should be rather small.
+   * Large states themselves may already be a problem regardless of how the serialization
+   * is implemented.
+   *
+   * @return A byte array containing the serialized state of the source coordinator.
+   * @throws IOException When something goes wrong in serialization.
+   */
+  private byte[] writeCheckpointBytes() throws IOException {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
+
+      out.writeLong(this.inFlightCheckpoint);
+      byte[] serializedInstant = this.inFlightInstant.getBytes();
+      out.writeInt(serializedInstant.length);
+      out.write(serializedInstant);
+      out.flush();
+      return baos.toByteArray();
+    }
+  }
+
+  /**
+   * Restore the state of this source coordinator from the state bytes.
+   *
+   * @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()}
+   * @throws Exception When the deserialization failed.
+   */
+  private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+         DataInputStream in = new DataInputViewStreamWrapper(bais)) {
+      long checkpointID = in.readLong();
+      int serializedInstantSize = in.readInt();
+      byte[] serializedInstant = readBytes(in, serializedInstantSize);
+      this.inFlightCheckpoint = checkpointID;
+      this.inFlightInstant = new String(serializedInstant);
+    }
+  }
+
+  private void reset() {
+    this.inFlightInstant = "";
+    this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
+  }
+
+  private void checkAndForceCommit(String errMsg) {
+    if (!checkReady()) {
+      // forced but still has inflight instant
+      String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
+      if (inflightInstant != null) {
+        assert inflightInstant.equals(this.inFlightInstant);
+        writeClient.rollback(this.inFlightInstant);
+        throw new HoodieException(errMsg);
+      }
+      if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
+        // The last checkpoint finished successfully.
+        return;
+      }
+    }
+    doCommit();
+  }
+
+  private void checkAndCommitWithRetry() {
+    int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
+    if (retryTimes < 0) {
+      retryTimes = 1;
+    }
+    long retryIntervalMillis = this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
+    int tryTimes = 0;
+    while (tryTimes++ < retryTimes) {
+      try {
+        if (!checkReady()) {
+          // Do not throw if the try times expires but the event buffer are still not ready,
+          // because we have a force check when next checkpoint starts.
+          sleepFor(retryIntervalMillis);
+          continue;
+        }
+        doCommit();
+        return;
+      } catch (Throwable throwable) {
+        String cause = throwable.getCause() == null ? "" : throwable.getCause().toString();
+        LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause);
+        if (tryTimes == retryTimes) {
+          throw new HoodieException(throwable);
+        }
+        sleepFor(retryIntervalMillis);
+      }
+    }
+  }
+
+  /**
+   * Sleep {@code intervalMillis} milliseconds in current thread.
+   */
+  private void sleepFor(long intervalMillis) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(intervalMillis);
+    } catch (InterruptedException e) {
+      LOG.error("Thread interrupted while waiting to retry the instant commits");
+      throw new HoodieException(e);
+    }
+  }
+
+  /** Checks the buffer is ready to commit. */
+  private boolean checkReady() {
+    return Arrays.stream(eventBuffer).allMatch(event ->
+        event != null && event.getInstantTime().equals(this.inFlightInstant));
+  }
+
+  /** Performs the actual commit action. */
+  private void doCommit() {
+    List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
+        .map(BatchWriteSuccessEvent::getWriteStatuses)
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    if (writeResults.size() == 0) {
+      // No data has written, clear the metadata file
+      this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
+      reset();
+      return;
+    }
+
+    // commit or rollback
+    long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+    long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
+    boolean hasErrors = totalErrorRecords > 0;
+
+    if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
+      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      if (hasErrors) {
+        LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="
+            + totalErrorRecords + "/" + totalRecords);
+      }
+
+      boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata));
+      if (success) {
+        reset();
+        LOG.info("Commit instant [{}] success!", this.inFlightInstant);
+      } else {
+        throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant));
+      }
+    } else {
+      LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
+      LOG.error("The first 100 error messages");
+      writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
+        LOG.error("Global error for partition path {} and fileID {}: {}",
+            ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
+        if (ws.getErrors().size() > 0) {
+          ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));
+        }
+      });
+      // Rolls back instant
+      writeClient.rollback(this.inFlightInstant);
+      throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant));
+    }
+  }
+
+  @VisibleForTesting
+  public BatchWriteSuccessEvent[] getEventBuffer() {
+    return eventBuffer;
+  }
+
+  @VisibleForTesting
+  public String getInFlightInstant() {
+    return inFlightInstant;
+  }
+
+  /**
+   * Provider for {@link StreamWriteOperatorCoordinator}.
+   */
+  public static class Provider implements OperatorCoordinator.Provider {
+    private final OperatorID operatorId;
+    private final Configuration conf;
+    private final int numTasks;
+
+    public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
+      this.operatorId = operatorId;
+      this.conf = conf;
+      this.numTasks = numTasks;
+    }
+
+    public OperatorID getOperatorId() {
+      return this.operatorId;
+    }
+
+    public OperatorCoordinator create(Context context) {
+      return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
+    }
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
new file mode 100644
index 0000000..f5faa54
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Factory class for {@link StreamWriteOperator}.
+ */
+public class StreamWriteOperatorFactory<I>
+    extends SimpleUdfStreamOperatorFactory<Object>
+    implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
+  private static final long serialVersionUID = 1L;
+
+  private final StreamWriteOperator<I> operator;
+  private final Configuration conf;
+  private final int numTasks;
+
+  public StreamWriteOperatorFactory(
+      RowType rowType,
+      Configuration conf,
+      int numTasks) {
+    super(new StreamWriteOperator<>(rowType, conf));
+    this.operator = (StreamWriteOperator<I>) getOperator();
+    this.conf = conf;
+    this.numTasks = numTasks;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
+    final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
+    final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+
+    this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
+    this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+    this.operator.setProcessingTimeService(this.processingTimeService);
+    eventDispatcher.registerEventHandler(operatorID, operator);
+    return (T) operator;
+  }
+
+  @Override
+  public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
+    return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, this.numTasks);
+  }
+
+  @Override
+  public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
+    super.setProcessingTimeService(processingTimeService);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
new file mode 100644
index 0000000..db5432e
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import org.apache.hudi.client.WriteStatus;
+
+import java.util.List;
+
+/**
+ * An operator event to mark successful checkpoint batch write.
+ */
+public class BatchWriteSuccessEvent implements OperatorEvent {
+  private static final long serialVersionUID = 1L;
+
+  private final List<WriteStatus> writeStatuses;
+  private final int taskID;
+  private final String instantTime;
+
+  public BatchWriteSuccessEvent(
+      int taskID,
+      String instantTime,
+      List<WriteStatus> writeStatuses) {
+    this.taskID = taskID;
+    this.instantTime = instantTime;
+    this.writeStatuses = writeStatuses;
+  }
+
+  public List<WriteStatus> getWriteStatuses() {
+    return writeStatuses;
+  }
+
+  public int getTaskID() {
+    return taskID;
+  }
+
+  public String getInstantTime() {
+    return instantTime;
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
index 82699d9..aed6e17 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
@@ -21,9 +21,11 @@ package org.apache.hudi.schema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -43,16 +45,13 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
   }
 
-  private final FileSystem fs;
-
   private final Schema sourceSchema;
 
   private Schema targetSchema;
 
   public FilebasedSchemaProvider(TypedProperties props) {
-    super(props);
     StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
-    this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
+    FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
     try {
       this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
       if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
@@ -64,6 +63,16 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     }
   }
 
+  public FilebasedSchemaProvider(Configuration conf) {
+    final String readSchemaPath = conf.getString(FlinkOptions.READ_SCHEMA_FILE_PATH);
+    final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf());
+    try {
+      this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath)));
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error reading schema", ioe);
+    }
+  }
+
   @Override
   public Schema getSourceSchema() {
     return sourceSchema;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
index 74b4067..5def413 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.schema;
 
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.Schema;
 
 import java.io.Serializable;
@@ -29,12 +27,6 @@ import java.io.Serializable;
  */
 public abstract class SchemaProvider implements Serializable {
 
-  protected TypedProperties config;
-
-  protected SchemaProvider(TypedProperties props) {
-    this.config = props;
-  }
-
   public abstract Schema getSourceSchema();
 
   public Schema getTargetSchema() {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
index 4ca7930..3a12842 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
@@ -25,7 +25,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieFlinkStreamerException;
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -51,7 +51,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
   /**
    * Job conf.
    */
-  private HoodieFlinkStreamer.Config cfg;
+  private FlinkStreamerConfig cfg;
 
   /**
    * Write client.
@@ -72,7 +72,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     // Get configs from runtimeContext
-    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
index 4eda371..8d0189c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
@@ -22,10 +22,10 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieFlinkStreamerException;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
new file mode 100644
index 0000000..7df63fa
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configurations for Hoodie Flink streamer.
+ */
+public class FlinkStreamerConfig extends Configuration {
+  @Parameter(names = {"--kafka-topic"}, description = "Kafka topic name.", required = true)
+  public String kafkaTopic;
+
+  @Parameter(names = {"--kafka-group-id"}, description = "Kafka consumer group id.", required = true)
+  public String kafkaGroupId;
+
+  @Parameter(names = {"--kafka-bootstrap-servers"}, description = "Kafka bootstrap.servers.", required = true)
+  public String kafkaBootstrapServers;
+
+  @Parameter(names = {"--flink-checkpoint-path"}, description = "Flink checkpoint path.")
+  public String flinkCheckPointPath;
+
+  @Parameter(names = {"--instant-retry-times"}, description = "Times to retry when latest instant has not completed.")
+  public String instantRetryTimes = "10";
+
+  @Parameter(names = {"--instant-retry-interval"}, description = "Seconds between two tries when latest instant has not completed.")
+  public String instantRetryInterval = "1";
+
+  @Parameter(names = {"--target-base-path"},
+      description = "Base path for the target hoodie table. "
+          + "(Will be created if did not exist first time around. If exists, expected to be a hoodie table).",
+      required = true)
+  public String targetBasePath;
+
+  @Parameter(names = {"--read-schema-path"},
+      description = "Avro schema file path, the parsed schema is used for deserializing.",
+      required = true)
+  public String readSchemaFilePath;
+
+  @Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
+  public String targetTableName;
+
+  @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true)
+  public String tableType;
+
+  @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for "
+      + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
+      + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
+      + "to individual classes, for supported properties.")
+  public String propsFilePath =
+      "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
+
+  @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+      + "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
+  public List<String> configs = new ArrayList<>();
+
+  @Parameter(names = {"--record-key-field"}, description = "Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n"
+      + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+      + "the dot notation eg: `a.b.c`. By default `uuid`.")
+  public String recordKeyField = "uuid";
+
+  @Parameter(names = {"--partition-path-field"}, description = "Partition path field. Value to be used at \n"
+      + "the `partitionPath` component of `HoodieKey`. Actual value obtained by invoking .toString(). By default `partitionpath`.")
+  public String partitionPathField = "partitionpath";
+
+  @Parameter(names = {"--keygen-class"}, description = "Key generator class, that implements will extract the key out of incoming record.\n"
+      + "By default `SimpleAvroKeyGenerator`.")
+  public String keygenClass = SimpleAvroKeyGenerator.class.getName();
+
+  @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"
+      + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record.")
+  public String sourceOrderingField = "ts";
+
+  @Parameter(names = {"--payload-class"}, description = "Subclass of HoodieRecordPayload, that works off "
+      + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.")
+  public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
+
+  @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+      + "is purely new data/inserts to gain speed).", converter = OperationConverter.class)
+  public WriteOperationType operation = WriteOperationType.UPSERT;
+
+  @Parameter(names = {"--filter-dupes"},
+      description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.")
+  public Boolean filterDupes = false;
+
+  @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
+  public Boolean commitOnErrors = false;
+
+  /**
+   * Flink checkpoint interval.
+   */
+  @Parameter(names = {"--checkpoint-interval"}, description = "Flink checkpoint interval.")
+  public Long checkpointInterval = 1000 * 5L;
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public Boolean help = false;
+
+  @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
+  public Integer writeTaskNum = 4;
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
new file mode 100644
index 0000000..f6d75d3
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.InstantGenerateOperator;
+import org.apache.hudi.operator.KeyedWriteProcessFunction;
+import org.apache.hudi.operator.KeyedWriteProcessOperator;
+import org.apache.hudi.sink.CommitSink;
+import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An Utility which can incrementally consume data from Kafka and apply it to the target table.
+ * currently, it only support COW table and insert, upsert operation.
+ */
+public class HoodieFlinkStreamer {
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    env.enableCheckpointing(cfg.checkpointInterval);
+    env.getConfig().setGlobalJobParameters(cfg);
+    // We use checkpoint to trigger write operation, including instant generating and committing,
+    // There can only be one checkpoint at one time.
+    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    if (cfg.flinkCheckPointPath != null) {
+      env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
+    }
+
+    TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
+
+    // add data source config
+    props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
+    props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
+
+    // Read from kafka source
+    DataStream<HoodieRecord> inputRecords =
+        env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
+            .filter(Objects::nonNull)
+            .map(new JsonStringToHoodieRecordMapFunction(props))
+            .name("kafka_to_hudi_record")
+            .uid("kafka_to_hudi_record_uid");
+
+    inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
+        .name("instant_generator")
+        .uid("instant_generator_id")
+
+        // Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
+        .keyBy(HoodieRecord::getPartitionPath)
+
+        // write operator, where the write operation really happens
+        .transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
+        }), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
+        .name("write_process")
+        .uid("write_process_uid")
+        .setParallelism(env.getParallelism())
+
+        // Commit can only be executed once, so make it one parallelism
+        .addSink(new CommitSink())
+        .name("commit_sink")
+        .uid("commit_sink_uid")
+        .setParallelism(1);
+
+    env.execute(cfg.targetTableName);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
new file mode 100644
index 0000000..a8f9245
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamWriteOperatorFactory;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Properties;
+
+/**
+ * An Utility which can incrementally consume data from Kafka and apply it to the target table.
+ * currently, it only support COW table and insert, upsert operation.
+ */
+public class HoodieFlinkStreamerV2 {
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    env.enableCheckpointing(cfg.checkpointInterval);
+    env.getConfig().setGlobalJobParameters(cfg);
+    // We use checkpoint to trigger write operation, including instant generating and committing,
+    // There can only be one checkpoint at one time.
+    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    if (cfg.flinkCheckPointPath != null) {
+      env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
+    }
+
+    Properties kafkaProps = StreamerUtil.appendKafkaProps(cfg);
+
+    // Read from kafka source
+    RowType rowType =
+        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+            .getLogicalType();
+    Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
+    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
+    StreamWriteOperatorFactory<RowData> operatorFactory =
+        new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
+
+    int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
+    LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
+    final RowData.FieldGetter partitionFieldGetter =
+        RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
+
+    DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
+        cfg.kafkaTopic,
+        new JsonRowDataDeserializationSchema(
+            rowType,
+            new RowDataTypeInfo(rowType),
+            false,
+            true,
+            TimestampFormat.ISO_8601
+        ), kafkaProps))
+        .name("kafka_source")
+        .uid("uid_kafka_source")
+        // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
+        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .transform("hoodie_stream_write", null, operatorFactory)
+        .uid("uid_hoodie_stream_write")
+        .setParallelism(numWriteTask); // should make it configurable
+
+    env.addOperator(dataStream.getTransformation());
+
+    env.execute(cfg.targetTableName);
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
similarity index 54%
copy from hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
copy to hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
index 74b4067..9baaf0a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,29 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.schema;
+package org.apache.hudi.streamer;
 
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
 
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
 
 /**
- * Class to provide schema for reading data and also writing into a Hoodie table.
+ * Converter that converts a string into enum WriteOperationType.
  */
-public abstract class SchemaProvider implements Serializable {
-
-  protected TypedProperties config;
-
-  protected SchemaProvider(TypedProperties props) {
-    this.config = props;
-  }
-
-  public abstract Schema getSourceSchema();
-
-  public Schema getTargetSchema() {
-    // by default, use source schema as target for hoodie table as well
-    return getSourceSchema();
+public class OperationConverter implements IStringConverter<WriteOperationType> {
+  @Override
+  public WriteOperationType convert(String value) throws ParameterException {
+    return WriteOperationType.valueOf(value);
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
new file mode 100644
index 0000000..21664f3
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+
+import java.util.List;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible with Flink's Table &
+ * SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the corresponding runtime classes
+ * {@link org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@link org.apache.flink.formats.avro.AvroRowSerializationSchema}.
+ *
+ * <p><p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ */
+public class AvroSchemaConverter {
+
+  /**
+   * Converts an Avro schema {@code schema} into a nested row structure with deterministic field order and
+   * data types that are compatible with Flink's Table & SQL API.
+   *
+   * @param schema Avro schema definition
+   * @return data type matching the schema
+   */
+  public static DataType convertToDataType(Schema schema) {
+    switch (schema.getType()) {
+      case RECORD:
+        final List<Schema.Field> schemaFields = schema.getFields();
+
+        final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
+        for (int i = 0; i < schemaFields.size(); i++) {
+          final Schema.Field field = schemaFields.get(i);
+          fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema()));
+        }
+        return DataTypes.ROW(fields).notNull();
+      case ENUM:
+        return DataTypes.STRING().notNull();
+      case ARRAY:
+        return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
+      case MAP:
+        return DataTypes.MAP(
+            DataTypes.STRING().notNull(),
+            convertToDataType(schema.getValueType()))
+            .notNull();
+      case UNION:
+        final Schema actualSchema;
+        final boolean nullable;
+        if (schema.getTypes().size() == 2
+            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(1);
+          nullable = true;
+        } else if (schema.getTypes().size() == 2
+            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(0);
+          nullable = true;
+        } else if (schema.getTypes().size() == 1) {
+          actualSchema = schema.getTypes().get(0);
+          nullable = false;
+        } else {
+          // use Kryo for serialization
+          return new AtomicDataType(
+              new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
+        }
+        DataType converted = convertToDataType(actualSchema);
+        return nullable ? converted.nullable() : converted;
+      case FIXED:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType =
+              (LogicalTypes.Decimal) schema.getLogicalType();
+          return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
+              .notNull();
+        }
+        // convert fixed size binary data to primitive byte arrays
+        return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
+      case STRING:
+        // convert Avro's Utf8/CharSequence to String
+        return DataTypes.STRING().notNull();
+      case BYTES:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType =
+              (LogicalTypes.Decimal) schema.getLogicalType();
+          return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
+              .notNull();
+        }
+        return DataTypes.BYTES().notNull();
+      case INT:
+        // logical date and time type
+        final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
+        if (logicalType == LogicalTypes.date()) {
+          return DataTypes.DATE().notNull();
+        } else if (logicalType == LogicalTypes.timeMillis()) {
+          return DataTypes.TIME(3).notNull();
+        }
+        return DataTypes.INT().notNull();
+      case LONG:
+        // logical timestamp type
+        if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+          return DataTypes.TIMESTAMP(3).notNull();
+        } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+          return DataTypes.TIMESTAMP(6).notNull();
+        } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+          return DataTypes.TIME(3).notNull();
+        } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+          return DataTypes.TIME(6).notNull();
+        }
+        return DataTypes.BIGINT().notNull();
+      case FLOAT:
+        return DataTypes.FLOAT().notNull();
+      case DOUBLE:
+        return DataTypes.DOUBLE().notNull();
+      case BOOLEAN:
+        return DataTypes.BOOLEAN().notNull();
+      case NULL:
+        return DataTypes.NULL();
+      default:
+        throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
+    }
+  }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
new file mode 100644
index 0000000..d5ff663
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/**
+ * Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}.
+ *
+ * <p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
+ */
+@Internal
+public class RowDataToAvroConverters {
+
+  // --------------------------------------------------------------------------------
+  // Runtime Converters
+  // --------------------------------------------------------------------------------
+
+  /**
+   * Runtime converter that converts objects of Flink Table & SQL internal data structures to
+   * corresponding Avro data structures.
+   */
+  @FunctionalInterface
+  public interface RowDataToAvroConverter extends Serializable {
+    Object convert(Schema schema, Object object);
+  }
+
+  // --------------------------------------------------------------------------------
+  // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
+  // necessary because the maven shade plugin cannot relocate classes in
+  // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for
+  // sql-client uber jars.
+  // --------------------------------------------------------------------------------
+
+  /**
+   * Creates a runtime converter accroding to the given logical type that converts objects of
+   * Flink Table & SQL internal data structures to corresponding Avro data structures.
+   */
+  public static RowDataToAvroConverter createConverter(LogicalType type) {
+    final RowDataToAvroConverter converter;
+    switch (type.getTypeRoot()) {
+      case NULL:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return null;
+              }
+            };
+        break;
+      case TINYINT:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((Byte) object).intValue();
+              }
+            };
+        break;
+      case SMALLINT:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((Short) object).intValue();
+              }
+            };
+        break;
+      case BOOLEAN: // boolean
+      case INTEGER: // int
+      case INTERVAL_YEAR_MONTH: // long
+      case BIGINT: // long
+      case INTERVAL_DAY_TIME: // long
+      case FLOAT: // float
+      case DOUBLE: // double
+      case TIME_WITHOUT_TIME_ZONE: // int
+      case DATE: // int
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return object;
+              }
+            };
+        break;
+      case CHAR:
+      case VARCHAR:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return new Utf8(object.toString());
+              }
+            };
+        break;
+      case BINARY:
+      case VARBINARY:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ByteBuffer.wrap((byte[]) object);
+              }
+            };
+        break;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((TimestampData) object).toInstant().toEpochMilli();
+              }
+            };
+        break;
+      case DECIMAL:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
+              }
+            };
+        break;
+      case ARRAY:
+        converter = createArrayConverter((ArrayType) type);
+        break;
+      case ROW:
+        converter = createRowConverter((RowType) type);
+        break;
+      case MAP:
+      case MULTISET:
+        converter = createMapConverter(type);
+        break;
+      case RAW:
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+
+    // wrap into nullable converter
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        if (object == null) {
+          return null;
+        }
+
+        // get actual schema if it is a nullable schema
+        Schema actualSchema;
+        if (schema.getType() == Schema.Type.UNION) {
+          List<Schema> types = schema.getTypes();
+          int size = types.size();
+          if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+            actualSchema = types.get(0);
+          } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+            actualSchema = types.get(1);
+          } else {
+            throw new IllegalArgumentException(
+                "The Avro schema is not a nullable type: " + schema.toString());
+          }
+        } else {
+          actualSchema = schema;
+        }
+        return converter.convert(actualSchema, object);
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+    final RowDataToAvroConverter[] fieldConverters =
+        rowType.getChildren().stream()
+            .map(RowDataToAvroConverters::createConverter)
+            .toArray(RowDataToAvroConverter[]::new);
+    final LogicalType[] fieldTypes =
+        rowType.getFields().stream()
+            .map(RowType.RowField::getType)
+            .toArray(LogicalType[]::new);
+    final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length];
+    for (int i = 0; i < fieldTypes.length; i++) {
+      fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+    }
+    final int length = rowType.getFieldCount();
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final RowData row = (RowData) object;
+        final List<Schema.Field> fields = schema.getFields();
+        final GenericRecord record = new GenericData.Record(schema);
+        for (int i = 0; i < length; ++i) {
+          final Schema.Field schemaField = fields.get(i);
+          Object avroObject =
+              fieldConverters[i].convert(
+                  schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+          record.put(i, avroObject);
+        }
+        return record;
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType) {
+    LogicalType elementType = arrayType.getElementType();
+    final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+    final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType());
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final Schema elementSchema = schema.getElementType();
+        ArrayData arrayData = (ArrayData) object;
+        List<Object> list = new ArrayList<>();
+        for (int i = 0; i < arrayData.size(); ++i) {
+          list.add(
+              elementConverter.convert(
+                  elementSchema, elementGetter.getElementOrNull(arrayData, i)));
+        }
+        return list;
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createMapConverter(LogicalType type) {
+    LogicalType valueType = extractValueTypeToAvroMap(type);
+    final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+    final RowDataToAvroConverter valueConverter = createConverter(valueType);
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final Schema valueSchema = schema.getValueType();
+        final MapData mapData = (MapData) object;
+        final ArrayData keyArray = mapData.keyArray();
+        final ArrayData valueArray = mapData.valueArray();
+        final Map<Object, Object> map = new HashMap<>(mapData.size());
+        for (int i = 0; i < mapData.size(); ++i) {
+          final String key = keyArray.getString(i).toString();
+          final Object value =
+              valueConverter.convert(
+                  valueSchema, valueGetter.getElementOrNull(valueArray, i));
+          map.put(key, value);
+        }
+        return map;
+      }
+    };
+  }
+}
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 71de651..9460ee8 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,47 +18,71 @@
 
 package org.apache.hudi.util;
 
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.operator.FlinkOptions;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.List;
+import java.util.Properties;
 
+/**
+ * Utilities for Flink stream read and write.
+ */
 public class StreamerUtil {
 
-  private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
+  private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
+
+  public static TypedProperties appendKafkaProps(FlinkStreamerConfig config) {
+    TypedProperties properties = getProps(config);
+    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers);
+    properties.put(ConsumerConfig.GROUP_ID_CONFIG, config.kafkaGroupId);
+    return properties;
+  }
 
-  public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) {
+  public static TypedProperties getProps(FlinkStreamerConfig cfg) {
     return readConfig(
         FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
         new Path(cfg.propsFilePath), cfg.configs).getConfig();
   }
 
+  public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
+    return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema();
+  }
+
+  public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
+    return new FilebasedSchemaProvider(conf).getSourceSchema();
+  }
 
   /**
-   * Read conig from files.
+   * Read config from properties file (`--props` option) and cmd line (`--hoodie-conf` option).
    */
   public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
     DFSPropertiesConfiguration conf;
@@ -81,16 +105,50 @@ public class StreamerUtil {
     return conf;
   }
 
-  public static Configuration getHadoopConf() {
-    return new Configuration();
+  public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+    // create hadoop configuration with hadoop conf directory configured.
+    org.apache.hadoop.conf.Configuration hadoopConf = null;
+    for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
+      hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
+      if (hadoopConf != null) {
+        break;
+      }
+    }
+    if (hadoopConf == null) {
+      hadoopConf = new org.apache.hadoop.conf.Configuration();
+    }
+    return hadoopConf;
   }
 
-  public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
-    checkPropNames.forEach(prop -> {
-      if (!props.containsKey(prop)) {
-        throw new HoodieNotSupportedException("Required property " + prop + " is missing");
+  /**
+   * Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
+   *
+   * @param hadoopConfDir Hadoop conf directory path.
+   * @return A Hadoop configuration instance.
+   */
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
+    if (new File(hadoopConfDir).exists()) {
+      org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
+      File coreSite = new File(hadoopConfDir, "core-site.xml");
+      if (coreSite.exists()) {
+        hadoopConfiguration.addResource(new Path(coreSite.getAbsolutePath()));
+      }
+      File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
+      if (hdfsSite.exists()) {
+        hadoopConfiguration.addResource(new Path(hdfsSite.getAbsolutePath()));
+      }
+      File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
+      if (yarnSite.exists()) {
+        hadoopConfiguration.addResource(new Path(yarnSite.getAbsolutePath()));
+      }
+      // Add mapred-site.xml. We need to read configurations like compression codec.
+      File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
+      if (mapredSite.exists()) {
+        hadoopConfiguration.addResource(new Path(mapredSite.getAbsolutePath()));
       }
-    });
+      return hadoopConfiguration;
+    }
+    return null;
   }
 
   /**
@@ -110,6 +168,21 @@ public class StreamerUtil {
   }
 
   /**
+   * Create a key generator class via reflection, passing in any configs needed.
+   * <p>
+   * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the corresponding key generator class; otherwise, use the default key generator class
+   * specified in {@link FlinkOptions}.
+   */
+  public static KeyGenerator createKeyGenerator(Configuration conf) throws IOException {
+    String keyGeneratorClass = conf.getString(FlinkOptions.KEYGEN_CLASS);
+    try {
+      return (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, flinkConf2TypedProperties(conf));
+    } catch (Throwable e) {
+      throw new IOException("Could not load key generator class " + keyGeneratorClass, e);
+    }
+  }
+
+  /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
   public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
@@ -122,21 +195,59 @@ public class StreamerUtil {
     }
   }
 
-  public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
-    FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
+  /**
+   * Create a payload class via reflection, do not ordering/precombine value.
+   */
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
+      throws IOException {
+    try {
+      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+          new Class<?>[] {Option.class}, Option.of(record));
+    } catch (Throwable e) {
+      throw new IOException("Could not create payload for class: " + payloadClass, e);
+    }
+  }
+
+  public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
     HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-            .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
-                .build())
-            .forTable(cfg.targetTableName)
+        HoodieWriteConfig.newBuilder()
+            .withEngineType(EngineType.FLINK)
+            .withPath(conf.getString(FlinkOptions.PATH))
+            .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
+            .withCompactionConfig(
+                HoodieCompactionConfig.newBuilder()
+                    .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
+                    .build())
+            .forTable(conf.getString(FlinkOptions.TABLE_NAME))
             .withAutoCommit(false)
-            .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
-                .getConfig());
+            .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
+
+    builder = builder.withSchema(getSourceSchema(conf).toString());
+    return builder.build();
+  }
 
-    builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
-    HoodieWriteConfig config = builder.build();
-    return config;
+  /**
+   * Converts the give {@link Configuration} to {@link TypedProperties}.
+   * The default values are also set up.
+   *
+   * @param conf The flink configuration
+   * @return a TypedProperties instance
+   */
+  public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
+    Properties properties = new Properties();
+    // put all the set up options
+    conf.addAllToProperties(properties);
+    // put all the default options
+    for (ConfigOption<?> option : FlinkOptions.OPTIONAL_OPTIONS) {
+      if (!conf.contains(option)) {
+        properties.put(option.key(), option.defaultValue());
+      }
+    }
+    return new TypedProperties(properties);
   }
 
+  public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
+    checkPropNames.forEach(prop ->
+        Preconditions.checkState(!props.containsKey(prop), "Required property " + prop + " is missing"));
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
new file mode 100644
index 0000000..c2d7a65
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.data.RowData;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.operator.utils.TestData.checkWrittenData;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for StreamingSinkFunction.
+ */
+public class StreamWriteFunctionTest {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1,par1, id2,par1,id2,Stephen,33,2,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3,par2, id4,par2,id4,Fabian,31,4,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+  }
+
+  private static final Map<String, String> EXPECTED2 = new HashMap<>();
+
+  static {
+    EXPECTED2.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+    EXPECTED2.put("par2", "[id3,par2,id3,Julian,54,3,par2, id4,par2,id4,Fabian,32,4,par2]");
+    EXPECTED2.put("par3", "[id5,par3,id5,Sophia,18,5,par3, id6,par3,id6,Emma,20,6,par3, "
+        + "id9,par3,id9,Jane,19,6,par3]");
+    EXPECTED2.put("par4", "[id10,par4,id10,Ella,38,7,par4, id11,par4,id11,Phoebe,52,8,par4, "
+        + "id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+  }
+
+  private StreamWriteFunctionWrapper<RowData> funcWrapper;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    this.funcWrapper = new StreamWriteFunctionWrapper<>(
+        tempFile.getAbsolutePath(),
+        TestConfigurations.SERIALIZER);
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    funcWrapper.close();
+  }
+
+  @Test
+  public void testCheckpoint() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_ONE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    // no checkpoint, so the coordinator does not accept any events
+    assertTrue(
+        funcWrapper.getEventBuffer().length == 1
+            && funcWrapper.getEventBuffer()[0] == null, "The coordinator events buffer expect to be empty");
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+    assertNotNull(writeStatuses);
+    MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
+    assertThat(writeStatuses.stream()
+            .map(WriteStatus::getPartitionPath).sorted(Comparator.naturalOrder())
+            .collect(Collectors.joining(",")),
+        is("par1,par2,par3,par4"));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+    funcWrapper.checkpointComplete(1);
+    // the coordinator checkpoint commits the inflight instant.
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+
+    // checkpoint for next round, no data input, so after the checkpoint,
+    // there should not be REQUESTED Instant
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(2);
+
+    String instant2 = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+    assertNotEquals(instant, instant2);
+
+    final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class));
+    List<WriteStatus> writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses();
+    assertNotNull(writeStatuses2);
+    assertThat(writeStatuses2.size(), is(0)); // write empty statuses
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent2);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    funcWrapper.checkpointComplete(2);
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+  }
+
+  @Test
+  public void testCheckpointFails() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    // no data written and triggers checkpoint fails,
+    // then we should revert the start instant
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+    assertNotNull(instant);
+
+    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses();
+    assertNotNull(writeStatuses);
+    assertThat(writeStatuses.size(), is(0)); // no data write
+
+    // fails the checkpoint
+    assertThrows(HoodieException.class,
+        () -> funcWrapper.checkpointFails(1),
+        "The last checkpoint was aborted, roll back the last write and throw");
+
+    // the instant metadata should be cleared
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null);
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
+
+    for (RowData rowData : TestData.DATA_SET_ONE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(2);
+    // Do not sent the write event and fails the checkpoint
+    assertThrows(HoodieException.class,
+        () -> funcWrapper.checkpointFails(2),
+        "The last checkpoint was aborted, roll back the last write and throw");
+  }
+
+  @Test
+  public void testInsert() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_ONE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+    final OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    checkWrittenData(tempFile, EXPECTED);
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+    funcWrapper.checkpointComplete(1);
+    // the coordinator checkpoint commits the inflight instant.
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+    checkWrittenData(tempFile, EXPECTED);
+  }
+
+  @Test
+  public void testUpsert() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_ONE) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // upsert another data buffer
+    for (RowData rowData : TestData.DATA_SET_TWO) {
+      funcWrapper.invoke(rowData);
+    }
+    // the data is not flushed yet
+    checkWrittenData(tempFile, EXPECTED);
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(2);
+
+    String instant = funcWrapper.getWriteClient()
+        .getInflightAndRequestedInstant("COPY_ON_WRITE");
+
+    nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class));
+    checkWrittenData(tempFile, EXPECTED2);
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
+
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
+    funcWrapper.checkpointComplete(2);
+    // the coordinator checkpoint commits the inflight instant.
+    checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
+    checkWrittenData(tempFile, EXPECTED2);
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private void checkInstantState(
+      HoodieFlinkWriteClient writeClient,
+      HoodieInstant.State state,
+      String instantStr) {
+    final String instant;
+    switch (state) {
+      case REQUESTED:
+        instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE");
+        break;
+      case COMPLETED:
+        instant = writeClient.getLastCompletedInstant("COPY_ON_WRITE");
+        break;
+      default:
+        throw new AssertionError("Unexpected state");
+    }
+    assertThat(instant, is(instantStr));
+  }
+
+  /**
+   * Asserts the data files are empty.
+   */
+  private void assertEmptyDataFiles() {
+    File[] dataFiles = tempFile.listFiles(file -> !file.getName().startsWith("."));
+    assertNotNull(dataFiles);
+    assertThat(dataFiles.length, is(0));
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
new file mode 100644
index 0000000..56f946b
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test for Flink Hoodie stream sink.
+ */
+public class StreamWriteITCase extends TestLogger {
+
+  private static final Map<String, String> EXPECTED = new HashMap<>();
+
+  static {
+    EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1, id2,par1,id2,Stephen,33,2000,par1]");
+    EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2, id4,par2,id4,Fabian,31,4000,par2]");
+    EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3, id6,par3,id6,Emma,20,6000,par3]");
+    EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4, id8,par4,id8,Han,56,8000,par4]");
+  }
+
+  @TempDir
+  File tempFile;
+
+  @Test
+  public void testWriteToHoodie() throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // 1 second a time
+    execEnv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+
+    // Read from kafka source
+    RowType rowType =
+        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+    StreamWriteOperatorFactory<RowData> operatorFactory =
+        new StreamWriteOperatorFactory<>(rowType, conf, 4);
+
+    int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
+    final RowData.FieldGetter partitionFieldGetter =
+        RowData.createFieldGetter(rowType.getTypeAt(partitionFieldIndex), partitionFieldIndex);
+
+    JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+        rowType,
+        new RowDataTypeInfo(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    DataStream<Object> dataStream = execEnv
+        // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+        .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+        .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
+        .keyBy(partitionFieldGetter::getFieldOrNull)
+        .transform("hoodie_stream_write", null, operatorFactory)
+        .uid("uid_hoodie_stream_write")
+        .setParallelism(4);
+    execEnv.addOperator(dataStream.getTransformation());
+
+    JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
+    if (client.getJobStatus().get() != JobStatus.FAILED) {
+      try {
+        TimeUnit.SECONDS.sleep(10);
+        client.cancel();
+      } catch (Throwable var1) {
+        // ignored
+      }
+    }
+
+    TestData.checkWrittenData(tempFile, EXPECTED);
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
new file mode 100644
index 0000000..c533b48
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for StreamingSinkOperatorCoordinator.
+ */
+public class StreamWriteOperatorCoordinatorTest {
+  private StreamWriteOperatorCoordinator coordinator;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    coordinator = new StreamWriteOperatorCoordinator(
+        TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
+    coordinator.start();
+  }
+
+  @AfterEach
+  public void after() {
+    coordinator.close();
+  }
+
+  @Test
+  public void testTableInitialized() throws IOException {
+    final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
+    String basePath = tempFile.getAbsolutePath();
+    try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
+      assertTrue(fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)));
+    }
+  }
+
+  @Test
+  public void testCheckpointAndRestore() throws Exception {
+    CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    coordinator.resetToCheckpoint(future.get());
+  }
+
+  @Test
+  public void testReceiveInvalidEvent() {
+    CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList());
+    assertThrows(IllegalStateException.class,
+        () -> coordinator.handleEventFromOperator(0, event),
+        "Receive an unexpected event for instant abc from task 0");
+  }
+
+  @Test
+  public void testCheckpointInvalid() {
+    final CompletableFuture<byte[]> future = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(1, future);
+    String inflightInstant = coordinator.getInFlightInstant();
+    OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
+    coordinator.handleEventFromOperator(0, event);
+    final CompletableFuture<byte[]> future2 = new CompletableFuture<>();
+    coordinator.checkpointCoordinator(2, future2);
+    assertTrue(future2.isCompletedExceptionally());
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java
new file mode 100644
index 0000000..40e58fe
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockFunctionInitializationContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+/**
+ * A {@link FunctionInitializationContext} for testing purpose.
+ */
+public class MockFunctionInitializationContext implements FunctionInitializationContext {
+
+  private final MockOperatorStateStore operatorStateStore;
+
+  public MockFunctionInitializationContext() {
+    operatorStateStore = new MockOperatorStateStore();
+  }
+
+  @Override
+  public boolean isRestored() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public MockOperatorStateStore getOperatorStateStore() {
+    return operatorStateStore;
+  }
+
+  @Override
+  public KeyedStateStore getKeyedStateStore() {
+    return operatorStateStore;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java
new file mode 100644
index 0000000..9697298
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockMapState.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.MapState;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Mock map state for testing.
+ *
+ * @param <K> Type of state key
+ * @param <V> Type of state value
+ */
+public class MockMapState<K, V> implements MapState<K, V> {
+  private final Map<K, V> backingMap = new HashMap<>();
+
+  @Override
+  public V get(K uk) {
+    return backingMap.get(uk);
+  }
+
+  @Override
+  public void put(K uk, V uv) {
+    backingMap.put(uk, uv);
+  }
+
+  @Override
+  public void putAll(Map<K, V> map) {
+    backingMap.putAll(map);
+  }
+
+  @Override
+  public void remove(K uk) {
+    backingMap.remove(uk);
+  }
+
+  @Override
+  public boolean contains(K uk) {
+    return backingMap.containsKey(uk);
+  }
+
+  @Override
+  public Iterable<Map.Entry<K, V>> entries() {
+    return backingMap.entrySet();
+  }
+
+  @Override
+  public Iterable<K> keys() {
+    return backingMap.keySet();
+  }
+
+  @Override
+  public Iterable<V> values() {
+    return backingMap.values();
+  }
+
+  @Override
+  public Iterator<Map.Entry<K, V>> iterator() {
+    return backingMap.entrySet().iterator();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return backingMap.isEmpty();
+  }
+
+  @Override
+  public void clear() {
+    backingMap.clear();
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java
new file mode 100644
index 0000000..016ad5b
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockOperatorStateStore.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An {@link OperatorStateStore} for testing purpose.
+ */
+@SuppressWarnings("rawtypes")
+public class MockOperatorStateStore implements KeyedStateStore, OperatorStateStore {
+
+  private final Map<Long, Map<String, TestUtils.MockListState>> historyStateMap;
+
+  private Map<String, TestUtils.MockListState> currentStateMap;
+  private Map<String, TestUtils.MockListState> lastSuccessStateMap;
+
+  private MapState mapState;
+
+  public MockOperatorStateStore() {
+    this.historyStateMap = new HashMap<>();
+
+    this.currentStateMap = new HashMap<>();
+    this.lastSuccessStateMap = new HashMap<>();
+
+    this.mapState = new MockMapState<>();
+  }
+
+  @Override
+  public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
+    return null;
+  }
+
+  @Override
+  public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) {
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
+    String name = stateDescriptor.getName();
+    currentStateMap.putIfAbsent(name, new TestUtils.MockListState());
+    return currentStateMap.get(name);
+  }
+
+  @Override
+  public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) {
+    return null;
+  }
+
+  @Override
+  public <I, A, O> AggregatingState<I, O> getAggregatingState(AggregatingStateDescriptor<I, A, O> aggregatingStateDescriptor) {
+    return null;
+  }
+
+  @Override
+  public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> foldingStateDescriptor) {
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <K, V> MapState<K, V> getMapState(MapStateDescriptor<K, V> mapStateDescriptor) {
+    return this.mapState;
+  }
+
+  @Override
+  public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<String> getRegisteredStateNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<String> getRegisteredBroadcastStateNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void checkpointBegin(long checkpointId) {
+    Map<String, TestUtils.MockListState> copiedStates = Collections.unmodifiableMap(copyStates(currentStateMap));
+    historyStateMap.put(checkpointId, copiedStates);
+  }
+
+  public void checkpointSuccess(long checkpointId) {
+    lastSuccessStateMap = historyStateMap.get(checkpointId);
+  }
+
+  public void rollBackToLastSuccessCheckpoint() {
+    this.currentStateMap = copyStates(lastSuccessStateMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, TestUtils.MockListState> copyStates(Map<String, TestUtils.MockListState> stateMap) {
+    Map<String, TestUtils.MockListState> copiedStates = new HashMap<>();
+    for (Map.Entry<String, TestUtils.MockListState> entry : stateMap.entrySet()) {
+      TestUtils.MockListState copiedState = new TestUtils.MockListState();
+      copiedState.addAll(entry.getValue().getBackingList());
+      copiedStates.put(entry.getKey(), copiedState);
+    }
+    return copiedStates;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java
new file mode 100644
index 0000000..1db98df
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/MockStreamingRuntimeContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import java.util.HashMap;
+
+/**
+ * Mock {@link StreamingRuntimeContext} to use in tests.
+ *
+ * <p>NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState.
+ */
+public class MockStreamingRuntimeContext extends StreamingRuntimeContext {
+
+  private final boolean isCheckpointingEnabled;
+
+  private final int numParallelSubtasks;
+  private final int subtaskIndex;
+
+  public MockStreamingRuntimeContext(
+      boolean isCheckpointingEnabled,
+      int numParallelSubtasks,
+      int subtaskIndex) {
+
+    this(isCheckpointingEnabled, numParallelSubtasks, subtaskIndex, new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .build());
+  }
+
+  public MockStreamingRuntimeContext(
+      boolean isCheckpointingEnabled,
+      int numParallelSubtasks,
+      int subtaskIndex,
+      MockEnvironment environment) {
+
+    super(new MockStreamOperator(), environment, new HashMap<>());
+
+    this.isCheckpointingEnabled = isCheckpointingEnabled;
+    this.numParallelSubtasks = numParallelSubtasks;
+    this.subtaskIndex = subtaskIndex;
+  }
+
+  @Override
+  public MetricGroup getMetricGroup() {
+    return new UnregisteredMetricsGroup();
+  }
+
+  @Override
+  public boolean isCheckpointingEnabled() {
+    return isCheckpointingEnabled;
+  }
+
+  @Override
+  public int getIndexOfThisSubtask() {
+    return subtaskIndex;
+  }
+
+  @Override
+  public int getNumberOfParallelSubtasks() {
+    return numParallelSubtasks;
+  }
+
+  private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
+    private static final long serialVersionUID = -1153976702711944427L;
+
+    private transient TestProcessingTimeService testProcessingTimeService;
+
+    private transient MockOperatorStateStore mockOperatorStateStore;
+
+    @Override
+    public ExecutionConfig getExecutionConfig() {
+      return new ExecutionConfig();
+    }
+
+    @Override
+    public OperatorID getOperatorID() {
+      return new OperatorID();
+    }
+
+    @Override
+    public ProcessingTimeService getProcessingTimeService() {
+      if (testProcessingTimeService == null) {
+        testProcessingTimeService = new TestProcessingTimeService();
+      }
+      return testProcessingTimeService;
+    }
+
+    @Override
+    public KeyedStateStore getKeyedStateStore() {
+      if (mockOperatorStateStore == null) {
+        mockOperatorStateStore = new MockOperatorStateStore();
+      }
+      return mockOperatorStateStore;
+    }
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
new file mode 100644
index 0000000..1b02791
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.operator.StreamWriteFunction;
+import org.apache.hudi.operator.StreamWriteOperatorCoordinator;
+import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link StreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class StreamWriteFunctionWrapper<I> {
+  private final TypeSerializer<I> serializer;
+  private final Configuration conf;
+
+  private final IOManager ioManager;
+  private final StreamingRuntimeContext runtimeContext;
+  private final MockOperatorEventGateway gateway;
+  private final StreamWriteOperatorCoordinator coordinator;
+  private final MockFunctionInitializationContext functionInitializationContext;
+
+  private StreamWriteFunction<Object, I, Object> function;
+
+  public StreamWriteFunctionWrapper(String tablePath, TypeSerializer<I> serializer) throws Exception {
+    this.serializer = serializer;
+    this.ioManager = new IOManagerAsync();
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+    this.gateway = new MockOperatorEventGateway();
+    this.conf = TestConfigurations.getDefaultConf(tablePath);
+    // one function
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+    this.coordinator.start();
+    this.functionInitializationContext = new MockFunctionInitializationContext();
+  }
+
+  public void openFunction() throws Exception {
+    function = new StreamWriteFunction<>(TestConfigurations.ROW_TYPE, this.conf);
+    function.setRuntimeContext(runtimeContext);
+    function.setOperatorEventGateway(gateway);
+    function.open(this.conf);
+  }
+
+  public void invoke(I record) throws Exception {
+    function.processElement(record, null, null);
+  }
+
+  public BatchWriteSuccessEvent[] getEventBuffer() {
+    return this.coordinator.getEventBuffer();
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.gateway.getNextEvent();
+  }
+
+  @SuppressWarnings("rawtypes")
+  public HoodieFlinkWriteClient getWriteClient() {
+    return this.function.getWriteClient();
+  }
+
+  public void checkpointFunction(long checkpointId) throws Exception {
+    // checkpoint the coordinator first
+    this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
+    function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+    functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+  }
+
+  public void checkpointComplete(long checkpointId) {
+    functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+    coordinator.checkpointComplete(checkpointId);
+  }
+
+  public void checkpointFails(long checkpointId) {
+    coordinator.notifyCheckpointAborted(checkpointId);
+  }
+
+  public void close() throws Exception {
+    coordinator.close();
+    ioManager.close();
+  }
+
+  public StreamWriteOperatorCoordinator getCoordinator() {
+    return coordinator;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
new file mode 100644
index 0000000..7513fed
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestConfigurations.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.hudi.operator.FlinkOptions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/**
+ * Configurations for the test.
+ */
+public class TestConfigurations {
+  private TestConfigurations() {
+  }
+
+  public static final RowType ROW_TYPE = (RowType) DataTypes.ROW(
+      DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+      DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+      DataTypes.FIELD("age", DataTypes.INT()),
+      DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+      DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull()
+      .getLogicalType();
+
+  public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE);
+
+  public static Configuration getDefaultConf(String tablePath) {
+    Configuration conf = new Configuration();
+    conf.setString(FlinkOptions.PATH, tablePath);
+    conf.setString(FlinkOptions.READ_SCHEMA_FILE_PATH,
+        Objects.requireNonNull(Thread.currentThread()
+            .getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+    conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+    return conf;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
new file mode 100644
index 0000000..7c2c314
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/TestData.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.utils;
+
+import org.apache.hudi.common.fs.FSUtils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.runtime.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Strings;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Data set for testing, also some utilities to check the results. */
+public class TestData {
+  public static List<RowData> DATA_SET_ONE = Arrays.asList(
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 31,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
+          TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
+  );
+
+  public static List<RowData> DATA_SET_TWO = Arrays.asList(
+      // advance the age by 1
+      binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+      binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
+      // same with before
+      binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+      // new data
+      binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
+          TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
+      binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
+          TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
+      binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
+          TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
+  );
+
+  /**
+   * Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param baseFile The file base to check, should be a directly
+   * @param expected The expected results mapping, the key should be the partition path
+   */
+  public static void checkWrittenData(File baseFile, Map<String, String> expected) throws IOException {
+    assert baseFile.isDirectory();
+    FileFilter filter = file -> !file.getName().startsWith(".");
+    File[] partitionDirs = baseFile.listFiles(filter);
+    assertNotNull(partitionDirs);
+    assertThat(partitionDirs.length, is(4));
+    for (File partitionDir : partitionDirs) {
+      File[] dataFiles = partitionDir.listFiles(file -> file.getName().endsWith(".parquet"));
+      assertNotNull(dataFiles);
+      File latestDataFile = Arrays.stream(dataFiles)
+          .max(Comparator.comparing(f -> FSUtils.getCommitTime(f.getName())))
+          .orElse(dataFiles[0]);
+      ParquetReader<GenericRecord> reader = AvroParquetReader
+          .<GenericRecord>builder(new Path(latestDataFile.getAbsolutePath())).build();
+      List<String> readBuffer = new ArrayList<>();
+      GenericRecord nextRecord = reader.read();
+      while (nextRecord != null) {
+        readBuffer.add(filterOutVariables(nextRecord));
+        nextRecord = reader.read();
+      }
+      readBuffer.sort(Comparator.naturalOrder());
+      assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
+    }
+  }
+
+  /**
+   * Filter out the variables like file name.
+   */
+  private static String filterOutVariables(GenericRecord genericRecord) {
+    List<String> fields = new ArrayList<>();
+    fields.add(genericRecord.get("_hoodie_record_key").toString());
+    fields.add(genericRecord.get("_hoodie_partition_path").toString());
+    fields.add(genericRecord.get("uuid").toString());
+    fields.add(genericRecord.get("name").toString());
+    fields.add(genericRecord.get("age").toString());
+    fields.add(genericRecord.get("ts").toString());
+    fields.add(genericRecord.get("partition").toString());
+    return Strings.join(fields, ",");
+  }
+
+  private static BinaryRowData binaryRow(Object... fields) {
+    LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+        .toArray(LogicalType[]::new);
+    assertEquals(
+        "Filed count inconsistent with type information",
+        fields.length,
+        types.length);
+    BinaryRowData row = new BinaryRowData(fields.length);
+    BinaryRowWriter writer = new BinaryRowWriter(row);
+    writer.reset();
+    for (int i = 0; i < fields.length; i++) {
+      Object field = fields[i];
+      if (field == null) {
+        writer.setNullAt(i);
+      } else {
+        BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i]));
+      }
+    }
+    writer.complete();
+    return row;
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
index 98066e9..a24f153 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestJsonStringToHoodieRecordMapFunction.java
@@ -70,7 +70,7 @@ public class TestJsonStringToHoodieRecordMapFunction extends HoodieFlinkClientTe
     props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, OverwriteWithLatestAvroPayload.class.getName());
     props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "timestamp");
     props.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
-    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionPath");
+    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_date");
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(2);
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/test/resources/test_read_schema.avsc
similarity index 53%
copy from hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
copy to hudi-flink/src/test/resources/test_read_schema.avsc
index 74b4067..0cbb4e3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/test/resources/test_read_schema.avsc
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,30 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.hudi.schema;
-
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
-
-import java.io.Serializable;
-
-/**
- * Class to provide schema for reading data and also writing into a Hoodie table.
- */
-public abstract class SchemaProvider implements Serializable {
-
-  protected TypedProperties config;
-
-  protected SchemaProvider(TypedProperties props) {
-    this.config = props;
-  }
-
-  public abstract Schema getSourceSchema();
-
-  public Schema getTargetSchema() {
-    // by default, use source schema as target for hoodie table as well
-    return getSourceSchema();
-  }
+{
+  "type" : "record",
+  "name" : "record",
+  "fields" : [ {
+    "name" : "uuid",
+    "type" : [ "null", "string" ],
+    "default" : null
+  }, {
+    "name" : "name",
+    "type" : [ "null", "string" ],
+    "default" : null
+  }, {
+    "name" : "age",
+    "type" : [ "null", "int" ],
+    "default" : null
+  }, {
+    "name" : "ts",
+    "type" : [ "null", {
+      "type" : "long",
+      "logicalType" : "timestamp-millis"
+    } ],
+    "default" : null
+  }, {
+    "name" : "partition",
+    "type" : [ "null", "string" ],
+    "default" : null
+  } ]
 }
diff --git a/hudi-flink/src/test/resources/test_source.data b/hudi-flink/src/test/resources/test_source.data
new file mode 100644
index 0000000..2f628e2
--- /dev/null
+++ b/hudi-flink/src/test/resources/test_source.data
@@ -0,0 +1,8 @@
+{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"}
+{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"}
+{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par2"}
+{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par2"}
+{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par3"}
+{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par3"}
+{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par4"}
+{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par4"}