You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/09/11 01:20:39 UTC

[hudi] branch master updated: [HUDI-2394] Implement Kafka Sink Protocol for Hudi for Ingesting Immutable Data (#3592)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e528dd7  [HUDI-2394] Implement Kafka Sink Protocol for Hudi for Ingesting Immutable Data (#3592)
e528dd7 is described below

commit e528dd798ab8ce6e4d444d2d771c107c503e8f25
Author: rmahindra123 <76...@users.noreply.github.com>
AuthorDate: Fri Sep 10 18:20:26 2021 -0700

    [HUDI-2394] Implement Kafka Sink Protocol for Hudi for Ingesting Immutable Data (#3592)
    
    
     - Fixing packaging, naming of classes
     - Use of log4j over slf4j for uniformity
    - More follow-on fixes
     - Added a version to control/coordinator events.
     - Eliminated the config added to write config
     - Fixed fetching of checkpoints based on table type
     - Clean up of naming, code placement
    
    Co-authored-by: Rajesh Mahindra <rm...@Rajeshs-MacBook-Pro.local>
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .gitignore                                         |   2 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  16 +
 .../apache/hudi/table/FileIdPrefixProvider.java    |  36 ++
 .../hudi/table/RandomFileIdPrefixProvider.java     |  35 ++
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  20 +-
 .../table/action/commit/JavaBulkInsertHelper.java  |  25 +-
 .../apache/hudi/common/config/ConfigGroups.java    |   6 +-
 .../org/apache/hudi/common/util/CommitUtils.java   |   1 +
 .../hudi/common/testutils/SchemaTestUtil.java      |  34 +-
 hudi-kafka-connect/README.md                       |  94 +++++
 hudi-kafka-connect/configs/config-sink.json        |  19 +
 .../configs/connect-distributed.properties         |  33 ++
 hudi-kafka-connect/pom.xml                         | 231 ++++++++++++
 hudi-kafka-connect/scripts/raw.json                |   5 +
 .../scripts/runKafkaTrafficGenerator.sh            |  38 ++
 .../apache/hudi/connect/HoodieSinkConnector.java   |  83 +++++
 .../org/apache/hudi/connect/HoodieSinkTask.java    | 212 +++++++++++
 .../connect/KafkaConnectFileIdPrefixProvider.java  |  69 ++++
 .../connect/kafka/KafkaConnectControlAgent.java    | 230 ++++++++++++
 .../hudi/connect/kafka/KafkaControlAgent.java      |  41 +++
 .../hudi/connect/kafka/KafkaControlProducer.java   |  97 +++++
 .../transaction/ConnectTransactionCoordinator.java | 399 +++++++++++++++++++++
 .../transaction/ConnectTransactionParticipant.java | 254 +++++++++++++
 .../hudi/connect/transaction/ControlEvent.java     | 222 ++++++++++++
 .../hudi/connect/transaction/CoordinatorEvent.java |  71 ++++
 .../transaction/TransactionCoordinator.java        |  40 +++
 .../hudi/connect/transaction/TransactionInfo.java  |  65 ++++
 .../transaction/TransactionParticipant.java        |  45 +++
 .../hudi/connect/utils/KafkaConnectUtils.java      | 140 ++++++++
 .../connect/writers/AbstractConnectWriter.java     |  91 +++++
 .../connect/writers/BufferedConnectWriter.java     | 120 +++++++
 .../writers/ConnectTransactionServices.java        |  38 ++
 .../apache/hudi/connect/writers/ConnectWriter.java |  31 ++
 .../connect/writers/ConnectWriterProvider.java     |  24 ++
 .../hudi/connect/writers/KafkaConnectConfigs.java  | 177 +++++++++
 .../writers/KafkaConnectTransactionServices.java   | 133 +++++++
 .../writers/KafkaConnectWriterProvider.java        | 105 ++++++
 .../connect/TestConnectTransactionCoordinator.java | 253 +++++++++++++
 .../connect/TestConnectTransactionParticipant.java | 273 ++++++++++++++
 .../helper/MockConnectTransactionServices.java     |  57 +++
 .../apache/hudi/helper/MockKafkaControlAgent.java  |  87 +++++
 .../apache/hudi/helper/TestHudiWriterProvider.java |  83 +++++
 .../org/apache/hudi/helper/TestKafkaConnect.java   | 133 +++++++
 .../hudi/writers/TestAbstractConnectWriter.java    | 203 +++++++++++
 .../hudi/writers/TestBufferedConnectWriter.java    | 101 ++++++
 .../src/test/resources/log4j-surefire.properties   |  32 ++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   1 +
 .../hudi/utilities/schema/SchemaProvider.java      |   3 +-
 packaging/hudi-kafka-connect-bundle/pom.xml        | 186 ++++++++++
 .../org/apache/hudi/kafka/connect/bundle/Main.java |  36 ++
 pom.xml                                            |   2 +
 51 files changed, 4710 insertions(+), 22 deletions(-)

diff --git a/.gitignore b/.gitignore
index fcd673b..413e0a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -78,4 +78,4 @@ dependency-reduced-pom.xml
 #######################################
 hudi-integ-test/compose_env
 node_modules
-package-lock.json
\ No newline at end of file
+package-lock.json
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bbe6b10..4df7d0d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -46,6 +46,7 @@ import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+import org.apache.hudi.table.RandomFileIdPrefixProvider;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
@@ -413,6 +414,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
           + "Once enabled, all the changes of a record are persisted to the delta log directly without merge");
 
+  public static final ConfigProperty<String> FILEID_PREFIX_PROVIDER_CLASS = ConfigProperty
+      .key("hoodie.fileid.prefix.provider.class")
+      .defaultValue(RandomFileIdPrefixProvider.class.getName())
+      .sinceVersion("0.10.0")
+      .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
 
   // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -1748,6 +1755,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
   }
 
+  public String getFileIdPrefixProviderClassName() {
+    return getString(FILEID_PREFIX_PROVIDER_CLASS);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2079,6 +2090,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withFileIdPrefixProviderClassName(String fileIdPrefixProviderClassName) {
+      writeConfig.setValue(FILEID_PREFIX_PROVIDER_CLASS, fileIdPrefixProviderClassName);
+      return this;
+    }
+
     public Builder withProperties(Properties properties) {
       this.writeConfig.getProps().putAll(properties);
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java
new file mode 100644
index 0000000..d06da9b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import java.util.Properties;
+
+public abstract class FileIdPrefixProvider {
+
+  private final Properties props;
+
+  public FileIdPrefixProvider(Properties props) {
+    this.props = props;
+  }
+
+  public Properties getProps() {
+    return props;
+  }
+
+  public abstract String createFilePrefix(String partitionPath);
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java
new file mode 100644
index 0000000..89d9934
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+
+import java.util.Properties;
+
+public class RandomFileIdPrefixProvider extends FileIdPrefixProvider {
+
+  public RandomFileIdPrefixProvider(Properties props) {
+    super(props);
+  }
+
+  @Override
+  public String createFilePrefix(String partitionPath) {
+    return FSUtils.createNewFileIdPfx();
+  }
+}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 8b7cb19..57b6306 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -30,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -41,6 +40,9 @@ import org.apache.hudi.table.HoodieJavaTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -153,11 +155,23 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
     throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
   }
 
+  public void transitionInflight(String instantTime) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    metaClient.getActiveTimeline().transitionRequestedToInflight(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, metaClient.getCommitActionType(), instantTime),
+        Option.empty(), config.shouldAllowMultiWriteOnSameInstant());
+  }
+
   @Override
   public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
                                                     String instantTime,
                                                     Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
-    throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not supported in HoodieJavaClient");
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
+        getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
+    table.validateInsertSchema();
+    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
+    HoodieWriteMetadata<List<WriteStatus>> result = table.bulkInsertPrepped(context, instantTime, preppedRecords, bulkInsertPartitioner);
+    return postWrite(result, instantTime, table);
   }
 
   @Override
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 9142569..b7ea916 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -19,17 +19,18 @@
 package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.execution.JavaLazyInsertIterable;
 import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.FileIdPrefixProvider;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -66,10 +67,14 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
                                                            final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
-    //transition bulk_insert state to inflight
-    table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
-            table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
-        config.shouldAllowMultiWriteOnSameInstant());
+    // It's possible the transition to inflight could have already happened.
+    if (!table.getActiveTimeline().filterInflights().containsInstant(instantTime)) {
+      table.getActiveTimeline().transitionRequestedToInflight(
+          new HoodieInstant(HoodieInstant.State.REQUESTED, table.getMetaClient().getCommitActionType(), instantTime),
+          Option.empty(),
+          config.shouldAllowMultiWriteOnSameInstant());
+    }
+
     // write new files
     List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
     //update index
@@ -102,12 +107,16 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
         : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
     repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
 
-    String idPfx = FSUtils.createNewFileIdPfx();
+    FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
+        config.getFileIdPrefixProviderClassName(),
+        config.getProps());
 
     List<WriteStatus> writeStatuses = new ArrayList<>();
 
-    new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, idPfx,
-        table.getTaskContextSupplier(), new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+    new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
+        config, instantTime, table,
+        fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
+        new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
 
     return writeStatuses;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index 4e53cca..18b7de2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -30,7 +30,8 @@ public class ConfigGroups {
     FLINK_SQL("Flink Sql Configs"),
     WRITE_CLIENT("Write Client Configs"),
     METRICS("Metrics Configs"),
-    RECORD_PAYLOAD("Record Payload Config");
+    RECORD_PAYLOAD("Record Payload Config"),
+    KAFKA_CONNECT("Kafka Connect Configs");
 
     public final String name;
 
@@ -72,6 +73,9 @@ public class ConfigGroups {
         description = "These set of configs are used to enable monitoring and reporting of key"
             + "Hudi stats and metrics.";
         break;
+      case KAFKA_CONNECT:
+        description = "These set of configs are used for Kakfa Connect Sink Connector for writing Hudi Tables";
+        break;
       default:
         description = "Please fill in the description for Config Group Name: " + names.name;
         break;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index b571efa..dee91b2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.exception.HoodieException;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index cde87d4..6016008 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -71,17 +71,20 @@ public final class SchemaTestUtil {
     return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit);
   }
 
+  public static List<String> generateTestJsonRecords(int from, int limit) throws IOException, URISyntaxException {
+    Path dataPath = initializeSampleDataPath();
+
+    try (Stream<String> stream = Files.lines(dataPath)) {
+      return stream.skip(from).limit(limit).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not read data from " + RESOURCE_SAMPLE_DATA, e);
+    }
+  }
+
   private static List<IndexedRecord> toRecords(Schema writerSchema, Schema readerSchema, int from, int limit)
       throws IOException, URISyntaxException {
     GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
-    // Required to register the necessary JAR:// file system
-    URI resource = SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI();
-    Path dataPath;
-    if (resource.toString().contains("!")) {
-      dataPath = uriToPath(resource);
-    } else {
-      dataPath = Paths.get(SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI());
-    }
+    Path dataPath = initializeSampleDataPath();
 
     try (Stream<String> stream = Files.lines(dataPath)) {
       return stream.skip(from).limit(limit).map(s -> {
@@ -96,6 +99,21 @@ public final class SchemaTestUtil {
     }
   }
 
+  /**
+   * Required to register the necessary JAR:// file system.
+   * @return Path to the sample data in the resource file.
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  private static Path initializeSampleDataPath() throws IOException, URISyntaxException {
+    URI resource = SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI();
+    if (resource.toString().contains("!")) {
+      return uriToPath(resource);
+    } else {
+      return Paths.get(SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI());
+    }
+  }
+
   public static Path uriToPath(URI uri) throws IOException {
     final Map<String, String> env = new HashMap<>();
     final String[] array = uri.toString().split("!");
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
new file mode 100644
index 0000000..fd0a5d0
--- /dev/null
+++ b/hudi-kafka-connect/README.md
@@ -0,0 +1,94 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+-->
+
+# Quick Start guide for Kafka Connect Sink for Hudi
+
+This repo contains a sample project that can be used to start off your own source connector for Kafka Connect.
+
+## Building the connector
+
+The first thing you need to do to start using this connector is building it. In order to do that, you need to install the following dependencies:
+
+- [Java 1.8+](https://openjdk.java.net/)
+- [Apache Maven](https://maven.apache.org/)
+
+After installing these dependencies, execute the following command:
+
+```bash
+cd $HUDI_DIR
+mvn clean package
+```
+
+## Incremental Builds
+
+```bash
+mvn clean -pl hudi-kafka-connect install -DskipTests
+mvn clean -pl packaging/hudi-kafka-connect-bundle install
+```
+
+## Put hudi connector in Kafka Connect classpath
+
+```bash
+cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/
+```
+
+## Trying the connector
+
+After building the package, we need to install the Apache Kafka
+
+### 1 - Starting the environment
+
+Start the ZK and Kafka:
+
+```bash
+./bin/zookeeper-server-start.sh ./config/zookeeper.properties
+./bin/kafka-server-start.sh ./config/server.properties
+```
+
+Wait until the kafka cluster is up and running.
+
+### 2 - Create the Hudi Control Topic for Coordination of the transactions
+
+The control topic should only have `1` partition
+
+```bash
+./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
+./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
+```
+
+### 3 - Create the Hudi Topic for the Sink and insert data into the topic
+
+Open a terminal to execute the following command:
+
+```bash
+bash runKafkaTrafficGenerator.sh <total_messages>
+```
+
+### 4 - Run the Sink connector worker (multiple workers can be run)
+
+Open a terminal to execute the following command:
+
+```bash
+./bin/connect-distributed.sh ../hudi-kafka-connect/configs/connect-distributed.properties
+```
+
+### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure)
+
+```bash
+curl -X DELETE http://localhost:8083/connectors/hudi-sink
+curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/configs/config-sink.json http://localhost:8083/connectors
+```
diff --git a/hudi-kafka-connect/configs/config-sink.json b/hudi-kafka-connect/configs/config-sink.json
new file mode 100644
index 0000000..4e94bf5
--- /dev/null
+++ b/hudi-kafka-connect/configs/config-sink.json
@@ -0,0 +1,19 @@
+{
+    "name": "hudi-sink",
+    "config": {
+		"bootstrap.servers": "localhost:9092",
+		"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
+		"tasks.max": "4",
+		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
+		"value.converter": "org.apache.kafka.connect.storage.StringConverter",
+		"value.converter.schemas.enable": "false",
+		"topics": "hudi-test-topic",
+		"hoodie.table.name": "hudi-test-topic",
+		"hoodie.base.path": "file:///tmp/hoodie/sample-table",
+		"hoodie.datasource.write.recordkey.field": "volume",
+		"hoodie.datasource.write.partitionpath.field": "year",
+		"hoodie.schemaprovider.class": "org.apache.hudi.schema.FilebasedSchemaProvider",
+		"hoodie.deltastreamer.schemaprovider.source.schema.file": "file:///tmp/hoodie/schema.avsc",
+		"hoodie.deltastreamer.schemaprovider.target.schema.file": "file:///tmp/hoodie/schema.avsc"
+    }
+}
diff --git a/hudi-kafka-connect/configs/connect-distributed.properties b/hudi-kafka-connect/configs/connect-distributed.properties
new file mode 100644
index 0000000..d7d453c
--- /dev/null
+++ b/hudi-kafka-connect/configs/connect-distributed.properties
@@ -0,0 +1,33 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+bootstrap.servers=localhost:9092
+group.id=hudi-connect-cluster
+key.converter=org.apache.kafka.connect.json.JsonConverter
+value.converter=org.apache.kafka.connect.json.JsonConverter
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+offset.storage.topic=connect-offsets
+offset.storage.replication.factor=1
+config.storage.topic=connect-configs
+config.storage.replication.factor=1
+status.storage.topic=connect-status
+status.storage.replication.factor=1
+
+offset.flush.interval.ms=60000
+listeners=HTTP://:8083
+plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
new file mode 100644
index 0000000..7742f3b
--- /dev/null
+++ b/hudi-kafka-connect/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hudi</artifactId>
+        <groupId>org.apache.hudi</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hudi-kafka-connect</artifactId>
+    <description>Kafka Connect Sink Connector for Hudi</description>
+    <version>0.10.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <main.basedir>${project.parent.basedir}</main.basedir>
+        <connect.api.version>2.5.0</connect.api.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.1.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+            </plugin>
+        </plugins>
+
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+            <resource>
+                <directory>src/test/resources</directory>
+            </resource>
+        </resources>
+    </build>
+
+    <dependencies>
+        <!-- Kafka Connect -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${connect.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-json</artifactId>
+            <version>${connect.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Hudi Dependencies -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-client-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- NOTE: This is temp (SchemaProvide dep) until PR3162 lands -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>1.12.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Logging -->
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </dependency>
+        
+        <!-- Fasterxml -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-csv</artifactId>
+            <version>${fasterxml.version}</version>
+        </dependency>
+
+        <!-- Parquet -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+
+        <!-- Hadoop -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <!-- Hudi - Test -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-common</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Tests Misc -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-runner</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-suite-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-commons</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/hudi-kafka-connect/scripts/raw.json b/hudi-kafka-connect/scripts/raw.json
new file mode 100644
index 0000000..aa2cc70
--- /dev/null
+++ b/hudi-kafka-connect/scripts/raw.json
@@ -0,0 +1,5 @@
+{"volume": 0, "symbol": "TPNL", "ts": "2017-08-31 09:30:00", "month": "08", "high": 6.37, "low": 1.37, "key": "TPNL_2017-08-31 09", "year": 2017, "date": "2017/08/31", "close": 4.44, "open": 1.37, "day": "31"}
+{"volume": 0, "symbol": "SPOT", "ts": "2018-08-31 09:30:00", "month": "08", "high": 1.87, "low": 0.37, "key": "TPNL_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 1.44, "open": 1.77, "day": "31"}
+{"volume": 0, "symbol": "GOOG", "ts": "2019-08-31 09:30:00", "month": "08", "high": 2.1, "low": 1.7, "key": "TPNL_2019-08-31 09", "year": 2019, "date": "2019/08/31", "close": 1.94, "open": 2.0, "day": "31"}
+{"volume": 0, "symbol": "MSFT", "ts": "2020-08-31 09:30:00", "month": "08", "high": 3.33, "low": 0.87, "key": "TPNL_2020-08-31 09", "year": 2020, "date": "2020/08/31", "close": 3.33, "open": 3.1, "day": "31"}
+{"volume": 0, "symbol": "APPL", "ts": "2021-08-31 09:30:00", "month": "08", "high": 3.17, "low": 2.37, "key": "TPNL_2021-08-31 09", "year": 2021, "date": "2021/08/31", "close": 2.66, "open": 3.1, "day": "31"}
diff --git a/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh b/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh
new file mode 100644
index 0000000..cff4140
--- /dev/null
+++ b/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+#!/bin/bash
+
+# First delete the existing topic
+$KAFKA_HOME/bin/kafka-topics.sh --delete --topic hudi-test-topic --bootstrap-server localhost:9092
+
+# Create the topic with 4 partitions
+$KAFKA_HOME/bin/kafka-topics.sh --create --topic hudi-test-topic --partitions 4 --replication-factor 1 --bootstrap-server localhost:9092
+
+# Generate kafka messages from raw records
+inputFile="raw.json"
+# Generate the records with unique keys
+for ((recordKey=0; recordKey<=$1;  ))
+do 
+	while IFS= read line 
+	do
+		echo $line |  jq --argjson recordKey $recordKey -c '.volume = $recordKey' | kcat -P -b localhost:9092 -t hudi-test-topic
+		((recordKey++))
+		if [ $(( $recordKey % 1000 )) -eq 0 ]
+			then sleep 1
+		fi
+	done < "$inputFile"
+done 
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
new file mode 100644
index 0000000..2d8cc47
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HudiSinkConnector is a Kafka Connect Connector implementation
+ * that ingest data from Kafka to Hudi.
+ */
+public class HoodieSinkConnector extends SinkConnector {
+
+  public static final String VERSION = "0.1.0";
+  private static final Logger LOG = LogManager.getLogger(HoodieSinkConnector.class);
+  private Map<String, String> configProps;
+
+  /**
+   * No-arg constructor. It is instantiated by Connect framework.
+   */
+  public HoodieSinkConnector() {
+  }
+
+  @Override
+  public String version() {
+    return VERSION;
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    configProps = new HashMap<>(props);
+  }
+
+  @Override
+  public Class<? extends Task> taskClass() {
+    return HoodieSinkTask.class;
+  }
+
+  @Override
+  public List<Map<String, String>> taskConfigs(int maxTasks) {
+    Map<String, String> taskProps = new HashMap<>(configProps);
+    List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
+    for (int i = 0; i < maxTasks; ++i) {
+      taskConfigs.add(taskProps);
+    }
+    return taskConfigs;
+  }
+
+  @Override
+  public void stop() {
+    LOG.info(String.format("Shutting down Hudi Sink connector %s", configProps.get("name")));
+  }
+
+  @Override
+  public ConfigDef config() {
+    // we use Hudi configs instead
+    return new ConfigDef();
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
new file mode 100644
index 0000000..c7dde9a
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.connect.kafka.KafkaConnectControlAgent;
+import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
+import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of the {@link SinkTask} interface provided by
+ * Kafka Connect. Implements methods to receive the Kafka records
+ * from the assigned partitions and commit the Kafka offsets.
+ * Also, handles re-assignments of partitions.
+ */
+public class HoodieSinkTask extends SinkTask {
+
+  public static final String TASK_ID_CONFIG_NAME = "task.id";
+  private static final Logger LOG = LogManager.getLogger(HoodieSinkTask.class);
+  private static final int COORDINATOR_KAFKA_PARTITION = 0;
+
+  private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
+  private final Map<TopicPartition, TransactionParticipant> hudiTransactionParticipants;
+  private KafkaConnectControlAgent controlKafkaClient;
+  private KafkaConnectConfigs connectConfigs;
+
+  private String taskId;
+  private String connectorName;
+
+  public HoodieSinkTask() {
+    transactionCoordinators = new HashMap();
+    hudiTransactionParticipants = new HashMap<>();
+  }
+
+  @Override
+  public String version() {
+    return HoodieSinkConnector.VERSION;
+  }
+
+  @Override
+  public void start(Map<String, String> props) {
+    connectorName = props.get("name");
+    taskId = props.get(TASK_ID_CONFIG_NAME);
+    LOG.info(String.format("Starting Hudi Sink Task for %s connector %s with id %s with assignments %s",
+        props, connectorName, taskId, context.assignment()));
+    try {
+      connectConfigs = KafkaConnectConfigs.newBuilder().withProperties(props).build();
+      controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
+          connectConfigs.getBootstrapServers(),
+          connectConfigs.getControlTopicName());
+      bootstrap(context.assignment());
+    } catch (ConfigException e) {
+      throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
+    } catch (ConnectException e) {
+      LOG.error("Couldn't start HudiSinkConnector:", e);
+      LOG.info("Shutting down HudiSinkConnector.");
+      cleanup();
+      // Always throw the original exception that prevent us from starting
+      throw e;
+    }
+  }
+
+  @Override
+  public void put(Collection<SinkRecord> records) {
+    for (SinkRecord record : records) {
+      String topic = record.topic();
+      int partition = record.kafkaPartition();
+      TopicPartition tp = new TopicPartition(topic, partition);
+      hudiTransactionParticipants.get(tp).buffer(record);
+    }
+
+    for (TopicPartition partition : context.assignment()) {
+      hudiTransactionParticipants.get(partition).processRecords();
+    }
+  }
+
+  @Override
+  public void stop() {
+    cleanup();
+  }
+
+  @Override
+  public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    // No-op. The connector is managing the offsets.
+  }
+
+  @Override
+  public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    // Although the connector manages offsets via commit files in Hudi, we still want to have Connect
+    // commit the consumer offsets for records this task has consumed from its topic partitions and
+    // committed to Hudi.
+    Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
+    for (TopicPartition partition : context.assignment()) {
+      TransactionParticipant worker = hudiTransactionParticipants.get(partition);
+      if (worker != null) {
+        worker.processRecords();
+        if (worker.getLastKafkaCommittedOffset() >= 0) {
+          result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void open(Collection<TopicPartition> partitions) {
+    LOG.info("New partitions added " + partitions.toString());
+    bootstrap(partitions);
+  }
+
+  @Override
+  public void close(Collection<TopicPartition> partitions) {
+    LOG.info("Existing partitions deleted " + partitions.toString());
+    // Close any writers we have. We may get assigned the same partitions and end up duplicating
+    // some effort since we'll have to reprocess those messages. It may be possible to hold on to
+    // the TopicPartitionWriter and continue to use the temp file, but this can get significantly
+    // more complex due to potential failures and network partitions. For example, we may get
+    // this close, then miss a few generations of group membership, during which
+    // data may have continued to be processed and we'd have to restart from the recovery stage,
+    // make sure we apply the WAL, and only reuse the temp file if the starting offset is still
+    // valid. For now, we prefer the simpler solution that may result in a bit of wasted effort.
+    for (TopicPartition partition : partitions) {
+      if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
+        if (transactionCoordinators.containsKey(partition)) {
+          transactionCoordinators.get(partition).stop();
+          transactionCoordinators.remove(partition);
+        }
+      }
+      TransactionParticipant worker = hudiTransactionParticipants.remove(partition);
+      if (worker != null) {
+        try {
+          LOG.debug("Closing data writer due to task start failure.");
+          worker.stop();
+        } catch (Throwable t) {
+          LOG.debug(String.format("Error closing and stopping data writer: %s", t.getMessage()), t);
+        }
+      }
+    }
+  }
+
+  private void bootstrap(Collection<TopicPartition> partitions) {
+    LOG.info(String.format("Bootstrap task for connector %s with id %s with assignments %s part %s",
+        connectorName, taskId, context.assignment(), partitions));
+    for (TopicPartition partition : partitions) {
+      try {
+        // If the partition is 0, instantiate the Leader
+        if (partition.partition() == COORDINATOR_KAFKA_PARTITION) {
+          ConnectTransactionCoordinator coordinator = new ConnectTransactionCoordinator(
+              connectConfigs,
+              partition,
+              controlKafkaClient);
+          coordinator.start();
+          transactionCoordinators.put(partition, coordinator);
+        }
+        ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context);
+        hudiTransactionParticipants.put(partition, worker);
+        worker.start();
+      } catch (HoodieException exception) {
+        LOG.error(String.format("Fatal error initializing task %s for partition %s", taskId, partition.partition()), exception);
+      }
+    }
+  }
+
+  private void cleanup() {
+    for (TopicPartition partition : context.assignment()) {
+      TransactionParticipant worker = hudiTransactionParticipants.get(partition);
+      if (worker != null) {
+        try {
+          LOG.debug("Closing data writer due to task start failure.");
+          worker.stop();
+        } catch (Throwable t) {
+          LOG.debug("Error closing and stopping data writer", t);
+        }
+      }
+    }
+    hudiTransactionParticipants.clear();
+    transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
+    transactionCoordinators.clear();
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
new file mode 100644
index 0000000..536ad4a
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.FileIdPrefixProvider;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Objects;
+import java.util.Properties;
+
+public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
+
+  public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition";
+  private static final Logger LOG = LogManager.getLogger(KafkaConnectFileIdPrefixProvider.class);
+
+  private final String kafkaPartition;
+
+  public KafkaConnectFileIdPrefixProvider(Properties props) {
+    super(props);
+    if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
+      LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
+      throw new HoodieException("Kafka Connect Partition Key " + KAFKA_CONNECT_PARTITION_ID + " not provided");
+    }
+    this.kafkaPartition = props.getProperty(KAFKA_CONNECT_PARTITION_ID);
+  }
+
+  @Override
+  public String createFilePrefix(String partitionPath) {
+    // We use a combination of kafka partition and partition path as the file id, and then hash it
+    // to generate a fixed sized hash.
+    String rawFileIdPrefix = kafkaPartition + partitionPath;
+    MessageDigest md;
+    try {
+      md = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      LOG.error("Fatal error selecting hash algorithm", e);
+      throw new HoodieException(e);
+    }
+
+    byte[] digest = Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8));
+
+    LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + partitionPath + " = " + rawFileIdPrefix
+        + " === " + StringUtils.toHexString(digest).toUpperCase());
+    return StringUtils.toHexString(digest).toUpperCase();
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
new file mode 100644
index 0000000..a115147
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.kafka;
+
+import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class that manages the Kafka consumer and producer for
+ * the Kafka Control Topic that ensures coordination across the
+ * {@link TransactionCoordinator} and {@link TransactionParticipant}s.
+ * Use a single instance per worker (single-threaded),
+ * and register multiple tasks that can receive the control messages.
+ */
+public class KafkaConnectControlAgent implements KafkaControlAgent {
+
+  private static final Logger LOG = LogManager.getLogger(KafkaConnectControlAgent.class);
+  private static final Object LOCK = new Object();
+  private static final long KAFKA_POLL_TIMEOUT_MS = 100;
+  private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
+
+  private static KafkaConnectControlAgent agent;
+  private final String bootstrapServers;
+  private final String controlTopicName;
+  private final ExecutorService executorService;
+  private final Map<String, TransactionCoordinator> topicCoordinators;
+  // List of TransactionParticipants per Kafka Topic
+  private final Map<String, ConcurrentLinkedQueue<TransactionParticipant>> partitionWorkers;
+  private final KafkaControlProducer producer;
+  private KafkaConsumer<String, ControlEvent> consumer;
+
+  public KafkaConnectControlAgent(String bootstrapServers,
+                                  String controlTopicName) {
+    this.bootstrapServers = bootstrapServers;
+    this.controlTopicName = controlTopicName;
+    this.executorService = Executors.newSingleThreadExecutor();
+    this.topicCoordinators = new HashMap<>();
+    this.partitionWorkers = new HashMap<>();
+    this.producer = new KafkaControlProducer(bootstrapServers, controlTopicName);
+    start();
+  }
+
+  public static KafkaConnectControlAgent createKafkaControlManager(String bootstrapServers,
+                                                                   String controlTopicName) {
+    if (agent == null) {
+      synchronized (LOCK) {
+        if (agent == null) {
+          agent = new KafkaConnectControlAgent(bootstrapServers, controlTopicName);
+        }
+      }
+    }
+    return agent;
+  }
+
+  @Override
+  public void registerTransactionParticipant(TransactionParticipant worker) {
+    if (!partitionWorkers.containsKey(worker.getPartition().topic())) {
+      partitionWorkers.put(worker.getPartition().topic(), new ConcurrentLinkedQueue<>());
+    }
+    partitionWorkers.get(worker.getPartition().topic()).add(worker);
+  }
+
+  @Override
+  public void deregisterTransactionParticipant(TransactionParticipant worker) {
+    if (partitionWorkers.containsKey(worker.getPartition().topic())) {
+      partitionWorkers.get(worker.getPartition().topic()).remove(worker);
+    }
+  }
+
+  @Override
+  public void registerTransactionCoordinator(TransactionCoordinator coordinator) {
+    if (!topicCoordinators.containsKey(coordinator.getPartition().topic())) {
+      topicCoordinators.put(coordinator.getPartition().topic(), coordinator);
+    }
+  }
+
+  public void deregisterTransactionCoordinator(TransactionCoordinator coordinator) {
+    topicCoordinators.remove(coordinator.getPartition().topic());
+  }
+
+  @Override
+  public void publishMessage(ControlEvent message) {
+    producer.publishMessage(message);
+  }
+
+  private void start() {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    // Todo fetch the worker id or name instead of a uuid.
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "hudi-control-group" + UUID.randomUUID().toString());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
+
+    // Since we are using Kafka Control Topic as a RPC like interface,
+    // we want consumers to only process messages that are sent after they come online
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+    consumer = new KafkaConsumer<>(props, new StringDeserializer(),
+        new KafkaJsonDeserializer<>(ControlEvent.class));
+
+    consumer.subscribe(Collections.singletonList(controlTopicName));
+
+    executorService.submit(() -> {
+      while (true) {
+        ConsumerRecords<String, ControlEvent> records;
+        records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
+        for (ConsumerRecord<String, ControlEvent> record : records) {
+          try {
+            LOG.debug(String.format("Kafka consumerGroupId = %s topic = %s, partition = %s, offset = %s, customer = %s, country = %s",
+                "", record.topic(), record.partition(), record.offset(), record.key(), record.value()));
+            ControlEvent message = record.value();
+            String senderTopic = message.senderPartition().topic();
+            if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
+              if (partitionWorkers.containsKey(senderTopic)) {
+                for (TransactionParticipant partitionWorker : partitionWorkers.get(senderTopic)) {
+                  partitionWorker.processControlEvent(message);
+                }
+              } else {
+                LOG.warn(String.format("Failed to send message for unregistered participants for topic %s", senderTopic));
+              }
+            } else if (message.getSenderType().equals(ControlEvent.SenderType.PARTICIPANT)) {
+              if (topicCoordinators.containsKey(senderTopic)) {
+                topicCoordinators.get(senderTopic).processControlEvent(message);
+              } else {
+                LOG.warn(String.format("Failed to send message for unregistered coordinator for topic %s", senderTopic));
+              }
+            } else {
+              LOG.warn(String.format("Sender type of Control Message unknown %s", message.getSenderType().name()));
+            }
+          } catch (Exception e) {
+            LOG.error(String.format("Fatal error while consuming a kafka record for topic = %s partition = %s", record.topic(), record.partition()), e);
+          }
+        }
+        try {
+          consumer.commitSync();
+        } catch (CommitFailedException exception) {
+          LOG.error("Fatal error while committing kafka control topic");
+        }
+      }
+    });
+  }
+
+  public void stop() {
+    producer.stop();
+    consumer.close();
+    if (executorService != null) {
+      boolean terminated = false;
+      try {
+        LOG.info("Shutting down executor service.");
+        executorService.shutdown();
+        LOG.info("Awaiting termination.");
+        terminated = executorService.awaitTermination(EXEC_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        // ignored
+      }
+
+      if (!terminated) {
+        LOG.warn(
+            "Unclean Kafka Control Manager executor service shutdown ");
+        executorService.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Deserializes the incoming Kafka records for the Control Topic.
+   *
+   * @param <T> represents the object that is sent over the Control Topic.
+   */
+  public static class KafkaJsonDeserializer<T> implements Deserializer<T> {
+
+    private static final Logger LOG = LogManager.getLogger(KafkaJsonDeserializer.class);
+    private final Class<T> type;
+
+    KafkaJsonDeserializer(Class<T> type) {
+      this.type = type;
+    }
+
+    @Override
+    public T deserialize(String s, byte[] bytes) {
+      ObjectMapper mapper = new ObjectMapper();
+      T obj = null;
+      try {
+        obj = mapper.readValue(bytes, type);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      }
+      return obj;
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java
new file mode 100644
index 0000000..ea5177e
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlAgent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.kafka;
+
+import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+
+/**
+ * Manages the Kafka consumer and producer for
+ * the Kafka Control Topic that ensures coordination across the
+ * {@link TransactionCoordinator} and {@link TransactionParticipant}s.
+ */
+public interface KafkaControlAgent {
+
+  void registerTransactionParticipant(TransactionParticipant worker);
+
+  void deregisterTransactionParticipant(TransactionParticipant worker);
+
+  void registerTransactionCoordinator(TransactionCoordinator coordinator);
+
+  void deregisterTransactionCoordinator(TransactionCoordinator coordinator);
+
+  void publishMessage(ControlEvent message);
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
new file mode 100644
index 0000000..a23251e
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.kafka;
+
+import org.apache.hudi.connect.transaction.ControlEvent;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+
+/**
+ * Kafka producer to send events to the
+ * Control Topic that coordinates transactions
+ * across Participants.
+ */
+public class KafkaControlProducer {
+
+  private static final Logger LOG = LogManager.getLogger(KafkaControlProducer.class);
+
+  private final String bootstrapServers;
+  private final String controlTopicName;
+  private Producer<String, ControlEvent> producer;
+
+  public KafkaControlProducer(String bootstrapServers, String controlTopicName) {
+    this.bootstrapServers = bootstrapServers;
+    this.controlTopicName = controlTopicName;
+    start();
+  }
+
+  private void start() {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
+
+    producer = new KafkaProducer<>(
+        props,
+        new StringSerializer(),
+        new KafkaJsonSerializer()
+    );
+  }
+
+  public void stop() {
+    producer.close();
+  }
+
+  public void publishMessage(ControlEvent message) {
+    ProducerRecord<String, ControlEvent> record
+        = new ProducerRecord<>(controlTopicName, message.key(), message);
+    producer.send(record);
+  }
+
+  public static class KafkaJsonSerializer implements Serializer<ControlEvent> {
+
+    private static final Logger LOG = LogManager.getLogger(KafkaJsonSerializer.class);
+
+    @Override
+    public byte[] serialize(String topic, ControlEvent data) {
+      byte[] retVal = null;
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+
+      try {
+        retVal = objectMapper.writeValueAsBytes(data);
+      } catch (Exception e) {
+        LOG.error("Fatal error during serialization of Kafka Control Message ", e);
+      }
+      return retVal;
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
new file mode 100644
index 0000000..13291c8
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.connect.kafka.KafkaControlAgent;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
+import org.apache.hudi.connect.writers.ConnectTransactionServices;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the Coordinator that
+ * coordinates the Hudi write transactions
+ * across all the Kafka partitions for a single Kafka Topic.
+ */
+public class ConnectTransactionCoordinator implements TransactionCoordinator, Runnable {
+
+  private static final Logger LOG = LogManager.getLogger(ConnectTransactionCoordinator.class);
+  private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
+  private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
+  private static final String KAFKA_OFFSET_DELIMITER = ",";
+  private static final String KAFKA_OFFSET_KV_DELIMITER = "=";
+  private static final Long START_COMMIT_INIT_DELAY_MS = 100L;
+  private static final Long RESTART_COMMIT_DELAY_MS = 500L;
+  private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
+
+  private final KafkaConnectConfigs configs;
+  private final TopicPartition partition;
+  private final KafkaControlAgent kafkaControlClient;
+  private final ConnectTransactionServices transactionServices;
+  private final KafkaPartitionProvider partitionProvider;
+  private final Map<Integer, List<WriteStatus>> partitionsWriteStatusReceived;
+  private final Map<Integer, Long> currentConsumedKafkaOffsets;
+  private final AtomicBoolean hasStarted = new AtomicBoolean(false);
+  private final BlockingQueue<CoordinatorEvent> events;
+  private final ExecutorService executorService;
+  private final ScheduledExecutorService scheduler;
+
+  private String currentCommitTime;
+  private Map<Integer, Long> globalCommittedKafkaOffsets;
+  private State currentState;
+  private int numPartitions;
+
+  public ConnectTransactionCoordinator(KafkaConnectConfigs configs,
+                                       TopicPartition partition,
+                                       KafkaControlAgent kafkaControlClient) throws HoodieException {
+    this(configs,
+        partition,
+        kafkaControlClient,
+        new KafkaConnectTransactionServices(configs),
+        KafkaConnectUtils::getLatestNumPartitions);
+  }
+
+  public ConnectTransactionCoordinator(KafkaConnectConfigs configs,
+                                       TopicPartition partition,
+                                       KafkaControlAgent kafkaControlClient,
+                                       ConnectTransactionServices transactionServices,
+                                       KafkaPartitionProvider partitionProvider) {
+    this.configs = configs;
+    this.partition = partition;
+    this.kafkaControlClient = kafkaControlClient;
+    this.transactionServices = transactionServices;
+    this.partitionProvider = partitionProvider;
+    this.events = new LinkedBlockingQueue<>();
+    scheduler = Executors.newSingleThreadScheduledExecutor();
+    executorService = Executors.newSingleThreadExecutor();
+
+
+    this.currentCommitTime = StringUtils.EMPTY_STRING;
+    this.partitionsWriteStatusReceived = new HashMap<>();
+    this.globalCommittedKafkaOffsets = new HashMap<>();
+    this.currentConsumedKafkaOffsets = new HashMap<>();
+    this.currentState = State.INIT;
+  }
+
+  @Override
+  public void start() {
+    if (hasStarted.compareAndSet(false, true)) {
+      executorService.submit(this);
+    }
+    kafkaControlClient.registerTransactionCoordinator(this);
+    LOG.info(String.format("Start Transaction Coordinator for topic %s partition %s",
+        partition.topic(), partition.partition()));
+
+    initializeGlobalCommittedKafkaOffsets();
+    // Submit the first start commit
+    submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
+            partition.topic(),
+            StringUtils.EMPTY_STRING),
+        START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void stop() {
+    kafkaControlClient.deregisterTransactionCoordinator(this);
+    hasStarted.set(false);
+    if (executorService != null) {
+      boolean terminated = false;
+      try {
+        LOG.info("Shutting down executor service.");
+        executorService.shutdown();
+        LOG.info("Awaiting termination.");
+        terminated = executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        // ignored
+      }
+
+      if (!terminated) {
+        LOG.warn(
+            "Unclean Kafka Control Manager executor service shutdown ");
+        executorService.shutdownNow();
+      }
+    }
+  }
+
+  @Override
+  public TopicPartition getPartition() {
+    return partition;
+  }
+
+  @Override
+  public void processControlEvent(ControlEvent message) {
+    CoordinatorEvent.CoordinatorEventType type;
+    if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
+      type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
+    } else {
+      LOG.warn(String.format("The Coordinator should not be receiving messages of type %s", message.getMsgType().name()));
+      return;
+    }
+
+    CoordinatorEvent event = new CoordinatorEvent(type,
+        message.senderPartition().topic(),
+        message.getCommitTime());
+    event.setMessage(message);
+    submitEvent(event);
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        CoordinatorEvent event = events.poll(COORDINATOR_EVENT_LOOP_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        if (event != null) {
+          processCoordinatorEvent(event);
+        }
+      } catch (InterruptedException exception) {
+        LOG.warn("Error received while polling the event loop in Partition Coordinator", exception);
+      }
+    }
+  }
+
+  private void submitEvent(CoordinatorEvent event) {
+    this.submitEvent(event, 0, TimeUnit.SECONDS);
+  }
+
+  private void submitEvent(CoordinatorEvent event, long delay, TimeUnit unit) {
+    scheduler.schedule(() -> {
+      events.add(event);
+    }, delay, unit);
+  }
+
+  private void processCoordinatorEvent(CoordinatorEvent event) {
+    try {
+      // Ignore NULL and STALE events, unless its one to start a new COMMIT
+      if (event == null
+          || (!event.getEventType().equals(CoordinatorEvent.CoordinatorEventType.START_COMMIT)
+          && (!event.getCommitTime().equals(currentCommitTime)))) {
+        return;
+      }
+
+      switch (event.getEventType()) {
+        case START_COMMIT:
+          startNewCommit();
+          break;
+        case END_COMMIT:
+          endExistingCommit();
+          break;
+        case WRITE_STATUS:
+          // Ignore stale write_status messages sent after
+          if (event.getMessage() != null
+              && currentState.equals(State.ENDED_COMMIT)) {
+            onReceiveWriteStatus(event.getMessage());
+          } else {
+            LOG.warn("Could not process WRITE_STATUS due to missing message");
+          }
+          break;
+        case ACK_COMMIT:
+          submitAckCommit();
+          break;
+        case WRITE_STATUS_TIMEOUT:
+          handleWriteStatusTimeout();
+          break;
+        default:
+          throw new IllegalStateException("Partition Coordinator has received an illegal event type " + event.getEventType().name());
+      }
+    } catch (Exception exception) {
+      LOG.warn("Error received while polling the event loop in Partition Coordinator", exception);
+    }
+  }
+
+  private void startNewCommit() {
+    numPartitions = partitionProvider.getLatestNumPartitions(configs.getString(BOOTSTRAP_SERVERS_CFG), partition.topic());
+    partitionsWriteStatusReceived.clear();
+    try {
+      currentCommitTime = transactionServices.startCommit();
+      ControlEvent message = new ControlEvent.Builder(
+          ControlEvent.MsgType.START_COMMIT,
+          ControlEvent.SenderType.COORDINATOR,
+          currentCommitTime,
+          partition)
+          .setCoordinatorInfo(
+              new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
+          .build();
+      kafkaControlClient.publishMessage(message);
+      currentState = State.STARTED_COMMIT;
+      // schedule a timeout for ending the current commit
+      submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.END_COMMIT,
+              partition.topic(),
+              currentCommitTime),
+          configs.getCommitIntervalSecs(), TimeUnit.SECONDS);
+    } catch (Exception exception) {
+      LOG.error(String.format("Failed to start a new commit %s, will retry", currentCommitTime), exception);
+      submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
+              partition.topic(),
+              StringUtils.EMPTY_STRING),
+          RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private void endExistingCommit() {
+    try {
+      ControlEvent message = new ControlEvent.Builder(
+          ControlEvent.MsgType.END_COMMIT,
+          ControlEvent.SenderType.COORDINATOR,
+          currentCommitTime,
+          partition)
+          .setCoordinatorInfo(new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
+          .build();
+      kafkaControlClient.publishMessage(message);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Could not send END_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
+    }
+    currentConsumedKafkaOffsets.clear();
+    currentState = State.ENDED_COMMIT;
+
+    // schedule a timeout for receiving all write statuses
+    submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.WRITE_STATUS_TIMEOUT,
+            partition.topic(),
+            currentCommitTime),
+        configs.getCoordinatorWriteTimeoutSecs(), TimeUnit.SECONDS);
+  }
+
+  private void onReceiveWriteStatus(ControlEvent message) {
+    ControlEvent.ParticipantInfo participantInfo = message.getParticipantInfo();
+    if (participantInfo.getOutcomeType().equals(ControlEvent.OutcomeType.WRITE_SUCCESS)) {
+      int partition = message.senderPartition().partition();
+      partitionsWriteStatusReceived.put(partition, participantInfo.writeStatuses());
+      currentConsumedKafkaOffsets.put(partition, participantInfo.getKafkaCommitOffset());
+    }
+    if (partitionsWriteStatusReceived.size() >= numPartitions
+        && currentState.equals(State.ENDED_COMMIT)) {
+      // Commit the kafka offsets to the commit file
+      try {
+        List<WriteStatus> allWriteStatuses = new ArrayList<>();
+        partitionsWriteStatusReceived.forEach((key, value) -> allWriteStatuses.addAll(value));
+        // Commit the last write in Hudi, along with the latest kafka offset
+        if (!allWriteStatuses.isEmpty()) {
+          transactionServices.endCommit(currentCommitTime,
+              allWriteStatuses,
+              transformKafkaOffsets(currentConsumedKafkaOffsets));
+        }
+        currentState = State.WRITE_STATUS_RCVD;
+        globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
+        submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
+            partition.topic(),
+            currentCommitTime));
+      } catch (Exception exception) {
+        LOG.error("Fatal error while committing file", exception);
+      }
+    }
+  }
+
+  private void handleWriteStatusTimeout() {
+    // If we are still stuck in ENDED_STATE
+    if (currentState.equals(State.ENDED_COMMIT)) {
+      currentState = State.WRITE_STATUS_TIMEDOUT;
+      LOG.warn("Did not receive the Write Status from all partitions");
+      // Submit the next start commit
+      submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
+              partition.topic(),
+              StringUtils.EMPTY_STRING),
+          RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private void submitAckCommit() {
+    try {
+      ControlEvent message = new ControlEvent.Builder(
+          ControlEvent.MsgType.ACK_COMMIT,
+          ControlEvent.SenderType.COORDINATOR,
+          currentCommitTime,
+          partition)
+          .setCoordinatorInfo(
+              new ControlEvent.CoordinatorInfo(globalCommittedKafkaOffsets))
+          .build();
+      kafkaControlClient.publishMessage(message);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Could not send ACK_COMMIT message for partition %s and commitTime %s", partition, currentCommitTime), exception);
+    }
+    currentState = State.ACKED_COMMIT;
+
+    // Submit the next start commit
+    submitEvent(new CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
+            partition.topic(),
+            StringUtils.EMPTY_STRING),
+        START_COMMIT_INIT_DELAY_MS, TimeUnit.MILLISECONDS);
+  }
+
+  private void initializeGlobalCommittedKafkaOffsets() {
+    try {
+      Map<String, String> commitMetadata = transactionServices.fetchLatestExtraCommitMetadata();
+      String latestKafkaOffsets = commitMetadata.get(KAFKA_OFFSET_KEY);
+      if (!StringUtils.isNullOrEmpty(latestKafkaOffsets)) {
+        LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " + latestKafkaOffsets);
+        globalCommittedKafkaOffsets = Arrays.stream(latestKafkaOffsets.split(KAFKA_OFFSET_DELIMITER))
+            .map(entry -> entry.split(KAFKA_OFFSET_KV_DELIMITER))
+            .collect(Collectors.toMap(entry -> Integer.parseInt(entry[0]), entry -> Long.parseLong(entry[1])));
+        LOG.info("Initialized the kafka offset commits " + globalCommittedKafkaOffsets);
+      }
+    } catch (Exception exception) {
+      throw new HoodieException("Could not deserialize the kafka commit offsets", exception);
+    }
+  }
+
+  private Map<String, String> transformKafkaOffsets(Map<Integer, Long> kafkaOffsets) {
+    try {
+      String kafkaOffsetValue = kafkaOffsets.keySet().stream()
+          .map(key -> key + KAFKA_OFFSET_KV_DELIMITER + kafkaOffsets.get(key))
+          .collect(Collectors.joining(KAFKA_OFFSET_DELIMITER));
+      return Collections.singletonMap(KAFKA_OFFSET_KEY, kafkaOffsetValue);
+    } catch (Exception exception) {
+      throw new HoodieException("Could not serialize the kafka commit offsets", exception);
+    }
+  }
+
+  private enum State {
+    INIT,
+    STARTED_COMMIT,
+    ENDED_COMMIT,
+    WRITE_STATUS_RCVD,
+    WRITE_STATUS_TIMEDOUT,
+    ACKED_COMMIT,
+  }
+
+  /**
+   * Provides the current partitions of a Kafka Topic dynamically.
+   */
+  public interface KafkaPartitionProvider {
+    int getLatestNumPartitions(String bootstrapServers, String topicName);
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
new file mode 100644
index 0000000..fe1996e
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.connect.kafka.KafkaControlAgent;
+import org.apache.hudi.connect.writers.ConnectWriterProvider;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implementation of the {@link TransactionParticipant} that coordinates the Hudi write transactions
+ * based on events from the {@link TransactionCoordinator} and manages the Hudi Writes for a specific Kafka Partition.
+ */
+public class ConnectTransactionParticipant implements TransactionParticipant {
+
+  private static final Logger LOG = LogManager.getLogger(ConnectTransactionParticipant.class);
+
+  private final LinkedList<SinkRecord> buffer;
+  private final BlockingQueue<ControlEvent> controlEvents;
+  private final TopicPartition partition;
+  private final SinkTaskContext context;
+  private final KafkaControlAgent kafkaControlAgent;
+  private final ConnectWriterProvider<WriteStatus> writerProvider;
+
+  private TransactionInfo<WriteStatus> ongoingTransactionInfo;
+  private long committedKafkaOffset;
+
+  public ConnectTransactionParticipant(KafkaConnectConfigs configs,
+                                       TopicPartition partition,
+                                       KafkaControlAgent kafkaControlAgent,
+                                       SinkTaskContext context) throws HoodieException {
+    this(partition, kafkaControlAgent, context, new KafkaConnectWriterProvider(configs, partition));
+  }
+
+  public ConnectTransactionParticipant(TopicPartition partition,
+                                       KafkaControlAgent kafkaControlAgent,
+                                       SinkTaskContext context,
+                                       ConnectWriterProvider<WriteStatus> writerProvider) throws HoodieException {
+    this.buffer = new LinkedList<>();
+    this.controlEvents = new LinkedBlockingQueue<>();
+    this.partition = partition;
+    this.context = context;
+    this.writerProvider = writerProvider;
+    this.kafkaControlAgent = kafkaControlAgent;
+    this.ongoingTransactionInfo = null;
+    this.committedKafkaOffset = 0;
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start Hudi Transaction Participant for partition " + partition.partition());
+    this.kafkaControlAgent.registerTransactionParticipant(this);
+    context.pause(partition);
+  }
+
+  @Override
+  public void stop() {
+    this.kafkaControlAgent.deregisterTransactionParticipant(this);
+    cleanupOngoingTransaction();
+  }
+
+  @Override
+  public void buffer(SinkRecord record) {
+    buffer.add(record);
+  }
+
+  @Override
+  public void processControlEvent(ControlEvent message) {
+    controlEvents.add(message);
+  }
+
+  @Override
+  public long getLastKafkaCommittedOffset() {
+    return committedKafkaOffset;
+  }
+
+  @Override
+  public TopicPartition getPartition() {
+    return partition;
+  }
+
+  @Override
+  public void processRecords() {
+    while (!controlEvents.isEmpty()) {
+      ControlEvent message = controlEvents.poll();
+      switch (message.getMsgType()) {
+        case START_COMMIT:
+          handleStartCommit(message);
+          break;
+        case END_COMMIT:
+          handleEndCommit(message);
+          break;
+        case ACK_COMMIT:
+          handleAckCommit(message);
+          break;
+        case WRITE_STATUS:
+          // ignore write status since its only processed by leader
+          break;
+        default:
+          throw new IllegalStateException("HudiTransactionParticipant received incorrect state " + message.getMsgType());
+      }
+    }
+
+    writeRecords();
+  }
+
+  private void handleStartCommit(ControlEvent message) {
+    // If there is an existing/ongoing transaction locally
+    // but it failed globally since we received another START_COMMIT instead of an END_COMMIT or ACK_COMMIT,
+    // so close it and start new transaction
+    cleanupOngoingTransaction();
+    // Resync the last committed Kafka offset from the leader
+    syncKafkaOffsetWithLeader(message);
+    context.resume(partition);
+    String currentCommitTime = message.getCommitTime();
+    LOG.info("Started a new transaction after receiving START_COMMIT for commit " + currentCommitTime);
+    try {
+      ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, writerProvider.getWriter(currentCommitTime));
+      ongoingTransactionInfo.setLastWrittenKafkaOffset(committedKafkaOffset);
+    } catch (Exception exception) {
+      LOG.warn("Error received while starting a new transaction", exception);
+    }
+  }
+
+  private void handleEndCommit(ControlEvent message) {
+    if (ongoingTransactionInfo == null) {
+      LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
+      return;
+    } else if (!ongoingTransactionInfo.getCommitTime().equals(message.getCommitTime())) {
+      LOG.error(String.format("Fatal error received END_COMMIT with commit time %s while local transaction commit time %s",
+          message.getCommitTime(), ongoingTransactionInfo.getCommitTime()));
+      // Recovery: A new END_COMMIT from leader caused interruption to an existing transaction,
+      // explicitly reset Kafka commit offset to ensure no data loss
+      cleanupOngoingTransaction();
+      syncKafkaOffsetWithLeader(message);
+      return;
+    }
+
+    // send Writer Status Message and wait for ACK_COMMIT in async fashion
+    try {
+      context.pause(partition);
+      ongoingTransactionInfo.commitInitiated();
+      //sendWriterStatus
+      List<WriteStatus> writeStatuses = new ArrayList<>();
+      try {
+        writeStatuses = ongoingTransactionInfo.getWriter().close();
+      } catch (IOException exception) {
+        LOG.warn("Error closing the Hudi Writer", exception);
+      }
+
+      ControlEvent writeStatus = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
+          ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
+          .setParticipantInfo(new ControlEvent.ParticipantInfo(
+              writeStatuses,
+              ongoingTransactionInfo.getLastWrittenKafkaOffset(),
+              ControlEvent.OutcomeType.WRITE_SUCCESS))
+          .build();
+      kafkaControlAgent.publishMessage(writeStatus);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Error ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
+    }
+  }
+
+  private void handleAckCommit(ControlEvent message) {
+    // Update lastKafkCommitedOffset locally.
+    if (ongoingTransactionInfo != null && committedKafkaOffset < ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
+      committedKafkaOffset = ongoingTransactionInfo.getLastWrittenKafkaOffset();
+    }
+    syncKafkaOffsetWithLeader(message);
+    cleanupOngoingTransaction();
+  }
+
+  private void writeRecords() {
+    if (ongoingTransactionInfo != null && !ongoingTransactionInfo.isCommitInitiated()) {
+      while (!buffer.isEmpty()) {
+        try {
+          SinkRecord record = buffer.peek();
+          if (record != null
+              && record.kafkaOffset() >= ongoingTransactionInfo.getLastWrittenKafkaOffset()) {
+            ongoingTransactionInfo.getWriter().writeRecord(record);
+            ongoingTransactionInfo.setLastWrittenKafkaOffset(record.kafkaOffset() + 1);
+          } else if (record != null && record.kafkaOffset() < committedKafkaOffset) {
+            LOG.warn(String.format("Received a kafka record with offset %s prior to last committed offset %s for partition %s",
+                record.kafkaOffset(), ongoingTransactionInfo.getLastWrittenKafkaOffset(),
+                partition));
+          }
+          buffer.poll();
+        } catch (Exception exception) {
+          LOG.warn(String.format("Error received while writing records for transaction %s in partition %s",
+              ongoingTransactionInfo.getCommitTime(), partition.partition()),
+              exception);
+        }
+      }
+    }
+  }
+
+  private void cleanupOngoingTransaction() {
+    if (ongoingTransactionInfo != null) {
+      try {
+        ongoingTransactionInfo.getWriter().close();
+        ongoingTransactionInfo = null;
+      } catch (IOException exception) {
+        LOG.warn("Error received while trying to cleanup existing transaction", exception);
+      }
+    }
+  }
+
+  private void syncKafkaOffsetWithLeader(ControlEvent message) {
+    if (message.getCoordinatorInfo() != null) {
+      Long coordinatorCommittedKafkaOffset = message.getCoordinatorInfo().getGlobalKafkaCommitOffsets().get(partition.partition());
+      // Recover kafka committed offsets, treating the commit offset from the coordinator
+      // as the source of truth
+      if (coordinatorCommittedKafkaOffset != null && coordinatorCommittedKafkaOffset >= 0) {
+        if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
+          LOG.warn(String.format("Recovering the kafka offset for partition %s to offset %s instead of local offset %s",
+              partition.partition(), coordinatorCommittedKafkaOffset, committedKafkaOffset));
+          context.offset(partition, coordinatorCommittedKafkaOffset);
+        }
+        committedKafkaOffset = coordinatorCommittedKafkaOffset;
+      }
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
new file mode 100644
index 0000000..0930648
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.SerializationUtils;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * The events sent over the Kafka Control Topic between the
+ * coordinator and the followers, in order to ensure
+ * coordination across all the writes.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class ControlEvent implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(ControlEvent.class);
+  private static final int CURRENT_VERSION = 0;
+
+  private final int version = CURRENT_VERSION;
+  private MsgType msgType;
+  private SenderType senderType;
+  private String commitTime;
+  private byte[] senderPartition;
+  private CoordinatorInfo coordinatorInfo;
+  private ParticipantInfo participantInfo;
+
+  public ControlEvent() {
+  }
+
+  public ControlEvent(MsgType msgType,
+                      SenderType senderType,
+                      String commitTime,
+                      byte[] senderPartition,
+                      CoordinatorInfo coordinatorInfo,
+                      ParticipantInfo participantInfo) {
+    this.msgType = msgType;
+    this.senderType = senderType;
+    this.commitTime = commitTime;
+    this.senderPartition = senderPartition;
+    this.coordinatorInfo = coordinatorInfo;
+    this.participantInfo = participantInfo;
+  }
+
+  public String key() {
+    return msgType.name().toLowerCase(Locale.ROOT);
+  }
+
+  public MsgType getMsgType() {
+    return msgType;
+  }
+
+  public SenderType getSenderType() {
+    return senderType;
+  }
+
+  public String getCommitTime() {
+    return commitTime;
+  }
+
+  public byte[] getSenderPartition() {
+    return senderPartition;
+  }
+
+  public TopicPartition senderPartition() {
+    return SerializationUtils.deserialize(senderPartition);
+  }
+
+  public CoordinatorInfo getCoordinatorInfo() {
+    return coordinatorInfo;
+  }
+
+  public ParticipantInfo getParticipantInfo() {
+    return participantInfo;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
+        Arrays.toString(senderPartition), coordinatorInfo.toString(), participantInfo.toString());
+  }
+
+  /**
+   * Builder that helps build {@link ControlEvent}.
+   */
+  public static class Builder {
+
+    private final MsgType msgType;
+    private SenderType senderType;
+    private final String commitTime;
+    private final byte[] senderPartition;
+    private CoordinatorInfo coordinatorInfo;
+    private ParticipantInfo participantInfo;
+
+    public Builder(MsgType msgType, SenderType senderType, String commitTime, TopicPartition senderPartition) throws IOException {
+      this.msgType = msgType;
+      this.senderType = senderType;
+      this.commitTime = commitTime;
+      this.senderPartition = SerializationUtils.serialize(senderPartition);
+    }
+
+    public Builder setCoordinatorInfo(CoordinatorInfo coordinatorInfo) {
+      this.coordinatorInfo = coordinatorInfo;
+      return this;
+    }
+
+    public Builder setParticipantInfo(ParticipantInfo participantInfo) {
+      this.participantInfo = participantInfo;
+      return this;
+    }
+
+    public ControlEvent build() {
+      return new ControlEvent(msgType, senderType, commitTime, senderPartition, coordinatorInfo, participantInfo);
+    }
+  }
+
+  /**
+   * The info sent by the {@link TransactionCoordinator} to one or more
+   * {@link TransactionParticipant}s.
+   */
+  public static class CoordinatorInfo implements Serializable {
+
+    private Map<Integer, Long> globalKafkaCommitOffsets;
+
+    public CoordinatorInfo() {
+    }
+
+    public CoordinatorInfo(Map<Integer, Long> globalKafkaCommitOffsets) {
+      this.globalKafkaCommitOffsets = globalKafkaCommitOffsets;
+    }
+
+    public Map<Integer, Long> getGlobalKafkaCommitOffsets() {
+      return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
+    }
+  }
+
+  /**
+   * The info sent by a {@link TransactionParticipant} instances to the
+   * {@link TransactionCoordinator}.
+   */
+  public static class ParticipantInfo implements Serializable {
+
+    private byte[] writeStatusList;
+    private long kafkaCommitOffset;
+    private OutcomeType outcomeType;
+
+    public ParticipantInfo() {
+    }
+
+    public ParticipantInfo(List<WriteStatus> writeStatuses, long kafkaCommitOffset, OutcomeType outcomeType) throws IOException {
+      this.writeStatusList = SerializationUtils.serialize(writeStatuses);
+      this.kafkaCommitOffset = kafkaCommitOffset;
+      this.outcomeType = outcomeType;
+    }
+
+    public byte[] getWriteStatusList() {
+      return writeStatusList;
+    }
+
+    public List<WriteStatus> writeStatuses() {
+      return SerializationUtils.deserialize(writeStatusList);
+    }
+
+    public long getKafkaCommitOffset() {
+      return kafkaCommitOffset;
+    }
+
+    public OutcomeType getOutcomeType() {
+      return outcomeType;
+    }
+  }
+
+  /**
+   * Type of Control Event.
+   */
+  public enum MsgType {
+    START_COMMIT,
+    END_COMMIT,
+    ACK_COMMIT,
+    WRITE_STATUS,
+  }
+
+  public enum SenderType {
+    COORDINATOR,
+    PARTICIPANT
+  }
+
+  public enum OutcomeType {
+    WRITE_SUCCESS,
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
new file mode 100644
index 0000000..a0e2654
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+/**
+ * The events within the Coordinator that trigger
+ * the state changes in the state machine of
+ * the Coordinator.
+ */
+public class CoordinatorEvent {
+
+  private final CoordinatorEventType eventType;
+  private final String topicName;
+  private final String commitTime;
+  private ControlEvent message;
+
+  public CoordinatorEvent(CoordinatorEventType eventType,
+                          String topicName,
+                          String commitTime) {
+    this.eventType = eventType;
+    this.topicName = topicName;
+    this.commitTime = commitTime;
+  }
+
+  public CoordinatorEventType getEventType() {
+    return eventType;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public String getCommitTime() {
+    return commitTime;
+  }
+
+  public ControlEvent getMessage() {
+    return message;
+  }
+
+  public void setMessage(ControlEvent message) {
+    this.message = message;
+  }
+
+  /**
+   * The type of Coordinator Event.
+   */
+  public enum CoordinatorEventType {
+    START_COMMIT,
+    END_COMMIT,
+    WRITE_STATUS,
+    ACK_COMMIT,
+    WRITE_STATUS_TIMEOUT
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java
new file mode 100644
index 0000000..04f8a2e
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionCoordinator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * The Base Coordinator that
+ * coordinates the write transactions
+ * across all the Kafka partitions, that
+ * are managed by the {@link TransactionParticipant}.
+ */
+public interface TransactionCoordinator {
+
+  void start();
+
+  void stop();
+
+  /* Kafka Topic that this Coordinator belongs to */
+  TopicPartition getPartition();
+
+  /* Called when a control event is received from the Kafka control topic */
+  void processControlEvent(ControlEvent message);
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
new file mode 100644
index 0000000..9c7bbf1
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.hudi.connect.writers.ConnectWriter;
+
+/**
+ * Stores all the state for the current Transaction within a
+ * {@link TransactionParticipant}.
+ * @param <T> The type of status returned by the underlying writer.
+ */
+public class TransactionInfo<T> {
+
+  private final String commitTime;
+  private final ConnectWriter<T> writer;
+  private long lastWrittenKafkaOffset;
+  private boolean commitInitiated;
+
+  public TransactionInfo(String commitTime, ConnectWriter<T> writer) {
+    this.commitTime = commitTime;
+    this.writer = writer;
+    this.lastWrittenKafkaOffset = 0;
+    this.commitInitiated = false;
+  }
+
+  public String getCommitTime() {
+    return commitTime;
+  }
+
+  public ConnectWriter<T> getWriter() {
+    return writer;
+  }
+
+  public long getLastWrittenKafkaOffset() {
+    return lastWrittenKafkaOffset;
+  }
+
+  public boolean isCommitInitiated() {
+    return commitInitiated;
+  }
+
+  public void setLastWrittenKafkaOffset(long lastWrittenKafkaOffset) {
+    this.lastWrittenKafkaOffset = lastWrittenKafkaOffset;
+  }
+
+  public void commitInitiated() {
+    this.commitInitiated = true;
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
new file mode 100644
index 0000000..0179f3b
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.transaction;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+/**
+ * Interface for the Participant that
+ * manages Writes for a
+ * single Kafka partition, based on
+ * coordination signals from the {@link TransactionCoordinator}.
+ */
+public interface TransactionParticipant {
+
+  void start();
+
+  void stop();
+
+  void buffer(SinkRecord record);
+
+  void processRecords();
+
+  TopicPartition getPartition();
+
+  void processControlEvent(ControlEvent message);
+
+  long getLastKafkaCommittedOffset();
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
new file mode 100644
index 0000000..593cfb1
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.utils;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.CustomAvroKeyGenerator;
+import org.apache.hudi.keygen.CustomKeyGenerator;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Helper methods for Kafka.
+ */
+public class KafkaConnectUtils {
+
+  private static final Logger LOG = LogManager.getLogger(KafkaConnectUtils.class);
+
+  public static int getLatestNumPartitions(String bootstrapServers, String topicName) {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", bootstrapServers);
+    try {
+      AdminClient client = AdminClient.create(props);
+      DescribeTopicsResult result = client.describeTopics(Arrays.asList(topicName));
+      Map<String, KafkaFuture<TopicDescription>> values = result.values();
+      KafkaFuture<TopicDescription> topicDescription = values.get(topicName);
+      int numPartitions = topicDescription.get().partitions().size();
+      LOG.info(String.format("Latest number of partitions for topic %s is %s", topicName, numPartitions));
+      return numPartitions;
+    } catch (Exception exception) {
+      throw new HoodieException("Fatal error fetching the latest partition of kafka topic name" + topicName, exception);
+    }
+  }
+
+  /**
+   * Returns the default Hadoop Configuration.
+   * @return
+   */
+  public static Configuration getDefaultHadoopConf() {
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+    return hadoopConf;
+  }
+
+  /**
+   * Extract the record fields.
+   * @param keyGenerator key generator Instance of the keygenerator.
+   * @return Returns the record key columns seprarated by comma.
+   */
+  public static String getRecordKeyColumns(KeyGenerator keyGenerator) {
+    return String.join(",", keyGenerator.getRecordKeyFieldNames());
+  }
+
+  /**
+   * Extract partition columns directly if an instance of class {@link BaseKeyGenerator},
+   * else extract partition columns from the properties.
+   *
+   * @param keyGenerator key generator Instance of the keygenerator.
+   * @param typedProperties properties from the config.
+   * @return partition columns Returns the partition columns seprarated by comma.
+   */
+  public static String getPartitionColumns(KeyGenerator keyGenerator, TypedProperties typedProperties) {
+
+    if (keyGenerator instanceof CustomKeyGenerator || keyGenerator instanceof CustomAvroKeyGenerator) {
+      return ((BaseKeyGenerator) keyGenerator).getPartitionPathFields().stream().map(
+          pathField -> Arrays.stream(pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX))
+              .findFirst().orElse("Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
+          .collect(Collectors.joining(","));
+    }
+
+    if (keyGenerator instanceof BaseKeyGenerator) {
+      return String.join(",", ((BaseKeyGenerator) keyGenerator).getPartitionPathFields());
+    }
+
+    return typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+  }
+
+
+  /**
+   * Get the Metadata from the latest commit file.
+   *
+   * @param metaClient The {@link HoodieTableMetaClient} to get access to the meta data.
+   * @return An Optional {@link HoodieCommitMetadata} containing the meta data from the latest commit file.
+   */
+  public static Option<HoodieCommitMetadata> getCommitMetadataForLatestInstant(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .filter(instant -> (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE && instant.getAction().equals(HoodieActiveTimeline.COMMIT_ACTION))
+            || (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ && instant.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
+        );
+    Option<HoodieInstant> latestInstant = timeline.lastInstant();
+    if (latestInstant.isPresent()) {
+      try {
+        byte[] data = timeline.getInstantDetails(latestInstant.get()).get();
+        return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
+      } catch (Exception e) {
+        throw new HoodieException("Failed to read schema from commit metadata", e);
+      }
+    } else {
+      return Option.empty();
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
new file mode 100644
index 0000000..c958b2b
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+/**
+ * Base Hudi Writer that manages reading the raw Kafka records and
+ * converting them to {@link HoodieRecord}s that can be written to Hudi by
+ * the derived implementations of this class.
+ */
+public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus> {
+
+  public static final String KAFKA_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
+  public static final String KAFKA_JSON_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
+  public static final String KAFKA_STRING_CONVERTER = "org.apache.kafka.connect.storage.StringConverter";
+  private static final Logger LOG = LogManager.getLogger(AbstractConnectWriter.class);
+
+  private final KafkaConnectConfigs connectConfigs;
+  private final KeyGenerator keyGenerator;
+  private final SchemaProvider schemaProvider;
+
+  public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
+                               KeyGenerator keyGenerator,
+                               SchemaProvider schemaProvider) {
+    this.connectConfigs = connectConfigs;
+    this.keyGenerator = keyGenerator;
+    this.schemaProvider = schemaProvider;
+  }
+
+  @Override
+  public void writeRecord(SinkRecord record) throws IOException {
+    AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
+    Option<GenericRecord> avroRecord;
+    switch (connectConfigs.getKafkaValueConverter()) {
+      case KAFKA_AVRO_CONVERTER:
+        avroRecord = Option.of((GenericRecord) record.value());
+        break;
+      case KAFKA_STRING_CONVERTER:
+        avroRecord = Option.of(convertor.fromJson((String) record.value()));
+        break;
+      case KAFKA_JSON_CONVERTER:
+        throw new UnsupportedEncodingException("Currently JSON objects are not supported");
+      default:
+        throw new IOException("Unsupported Kafka Format type (" + connectConfigs.getKafkaValueConverter() + ")");
+    }
+
+    HoodieRecord hoodieRecord = new HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new HoodieAvroPayload(avroRecord));
+    writeHudiRecord(hoodieRecord);
+  }
+
+  @Override
+  public List<WriteStatus> close() {
+    return flushHudiRecords();
+  }
+
+  protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record);
+
+  protected abstract List<WriteStatus> flushHudiRecords();
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
new file mode 100644
index 0000000..3319604
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.schema.SchemaProvider;
+
+import org.apache.avro.Schema;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Specific implementation of a Hudi Writer that buffers all incoming records,
+ * and writes them to Hudi files on the end of a transaction using Bulk Insert.
+ */
+public class BufferedConnectWriter extends AbstractConnectWriter {
+
+  private static final Logger LOG = LogManager.getLogger(BufferedConnectWriter.class);
+
+  private final HoodieEngineContext context;
+  private final HoodieJavaWriteClient writeClient;
+  private final String instantTime;
+  private final HoodieWriteConfig config;
+  private ExternalSpillableMap<String, HoodieRecord<HoodieAvroPayload>> bufferedRecords;
+
+  public BufferedConnectWriter(HoodieEngineContext context,
+                               HoodieJavaWriteClient writeClient,
+                               String instantTime,
+                               KafkaConnectConfigs connectConfigs,
+                               HoodieWriteConfig config,
+                               KeyGenerator keyGenerator,
+                               SchemaProvider schemaProvider) {
+    super(connectConfigs, keyGenerator, schemaProvider);
+    this.context = context;
+    this.writeClient = writeClient;
+    this.instantTime = instantTime;
+    this.config = config;
+    init();
+  }
+
+  private void init() {
+    try {
+      // Load and batch all incoming records in a map
+      long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(context.getTaskContextSupplier(), config);
+      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+      this.bufferedRecords = new ExternalSpillableMap<>(memoryForMerge,
+          config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator(),
+          new HoodieRecordSizeEstimator(new Schema.Parser().parse(config.getSchema())),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
+    } catch (IOException io) {
+      throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
+    }
+  }
+
+  @Override
+  public void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+    bufferedRecords.put(record.getRecordKey(), record);
+  }
+
+  @Override
+  public List<WriteStatus> flushHudiRecords() {
+    try {
+      LOG.info("Number of entries in MemoryBasedMap => "
+          + bufferedRecords.getInMemoryMapNumEntries()
+          + "Total size in bytes of MemoryBasedMap => "
+          + bufferedRecords.getCurrentInMemoryMapSize() + "Number of entries in BitCaskDiskMap => "
+          + bufferedRecords.getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+          + bufferedRecords.getSizeOfFileOnDiskInBytes());
+      List<WriteStatus> writeStatuses = new ArrayList<>();
+      // Write out all records if non-empty
+      if (!bufferedRecords.isEmpty()) {
+        writeStatuses = writeClient.bulkInsertPreppedRecords(
+          bufferedRecords.values().stream().collect(Collectors.toList()),
+        instantTime, Option.empty());
+      }
+      bufferedRecords.close();
+      LOG.info("Flushed hudi records and got writeStatuses: "
+          + writeStatuses);
+      return writeStatuses;
+    } catch (Exception e) {
+      throw new HoodieException("Write records failed", e);
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
new file mode 100644
index 0000000..b36e1f1
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectTransactionServices.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Transaction service APIs used by
+ * {@link TransactionCoordinator}.
+ */
+public interface ConnectTransactionServices {
+
+  String startCommit();
+
+  void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata);
+
+  Map<String, String> fetchLatestExtraCommitMetadata();
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java
new file mode 100644
index 0000000..a90d72a
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface ConnectWriter<T> {
+
+  void writeRecord(SinkRecord record) throws IOException;
+
+  List<T> close() throws IOException;
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriterProvider.java
new file mode 100644
index 0000000..87deedc
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/ConnectWriterProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+public interface ConnectWriterProvider<T> {
+
+  ConnectWriter<T> getWriter(String commitTime);
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
new file mode 100644
index 0000000..ae6b5d1
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.schema.FilebasedSchemaProvider;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Class storing configs for the HoodieWriteClient.
+ */
+@Immutable
+@ConfigClassProperty(name = "Kafka Sink Connect Configurations",
+    groupName = ConfigGroups.Names.KAFKA_CONNECT,
+    description = "Configurations for Kakfa Connect Sink Connector for Hudi.")
+public class KafkaConnectConfigs extends HoodieConfig {
+
+  public static final String KAFKA_VALUE_CONVERTER = "value.converter";
+
+  public static final ConfigProperty<String> KAFKA_BOOTSTRAP_SERVERS = ConfigProperty
+      .key("bootstrap.servers")
+      .defaultValue("localhost:9092")
+      .withDocumentation("The bootstrap servers for the Kafka Cluster.");
+
+  public static final ConfigProperty<String> CONTROL_TOPIC_NAME = ConfigProperty
+      .key("hoodie.kafka.control.topic")
+      .defaultValue("hudi-control-topic")
+      .withDocumentation("Kafka topic name used by the Hudi Sink Connector for "
+          + "sending and receiving control messages. Not used for data records.");
+
+  public static final ConfigProperty<String> SCHEMA_PROVIDER_CLASS = ConfigProperty
+      .key("hoodie.schemaprovider.class")
+      .defaultValue(FilebasedSchemaProvider.class.getName())
+      .withDocumentation("subclass of org.apache.hudi.schema.SchemaProvider "
+          + "to attach schemas to input & target table data, built in options: "
+          + "org.apache.hudi.schema.FilebasedSchemaProvider.");
+
+  public static final ConfigProperty<String> COMMIT_INTERVAL_SECS = ConfigProperty
+      .key("hoodie.kafka.commit.interval.secs")
+      .defaultValue("60")
+      .withDocumentation("The interval at which Hudi will commit the records written "
+          + "to the files, making them consumable on the read-side.");
+
+  public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS = ConfigProperty
+      .key("hoodie.kafka.coordinator.write.timeout.secs")
+      .defaultValue("60")
+      .withDocumentation("The timeout after sending an END_COMMIT until when "
+          + "the coordinator will wait for the write statuses from all the partitions"
+          + "to ignore the current commit and start a new commit.");
+
+  public static final ConfigProperty<String> META_SYNC_ENABLE = ConfigProperty
+      .key("hoodie.meta.sync.enable")
+      .defaultValue("false")
+      .withDocumentation("Enable Meta Sync such as Hive");
+
+  public static final ConfigProperty<String> META_SYNC_CLASSES = ConfigProperty
+      .key("hoodie.meta.sync.classes")
+      .defaultValue(HiveSyncTool.class.getName())
+      .withDocumentation("Meta sync client tool, using comma to separate multi tools");
+
+  protected KafkaConnectConfigs() {
+    super();
+  }
+
+  protected KafkaConnectConfigs(Properties props) {
+    super(props);
+    Properties newProps = new Properties();
+    newProps.putAll(props);
+  }
+
+  public static KafkaConnectConfigs.Builder newBuilder() {
+    return new KafkaConnectConfigs.Builder();
+  }
+
+  public String getBootstrapServers() {
+    return getString(KAFKA_BOOTSTRAP_SERVERS);
+  }
+
+  public String getControlTopicName() {
+    return getString(CONTROL_TOPIC_NAME);
+  }
+
+  public String getSchemaProviderClass() {
+    return getString(SCHEMA_PROVIDER_CLASS);
+  }
+
+  public Long getCommitIntervalSecs() {
+    return getLong(COMMIT_INTERVAL_SECS);
+  }
+
+  public Long getCoordinatorWriteTimeoutSecs() {
+    return getLong(COORDINATOR_WRITE_TIMEOUT_SECS);
+  }
+
+  public String getKafkaValueConverter() {
+    return getString(KAFKA_VALUE_CONVERTER);
+  }
+
+  public Boolean isMetaSyncEnabled() {
+    return getBoolean(META_SYNC_ENABLE);
+  }
+
+  public String getMetaSyncClasses() {
+    return getString(META_SYNC_CLASSES);
+  }
+
+  public static class Builder {
+
+    protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
+
+    public Builder withBootstrapServers(String bootstrapServers) {
+      connectConfigs.setValue(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
+      return this;
+    }
+
+    public Builder withControlTopicName(String controlTopicName) {
+      connectConfigs.setValue(CONTROL_TOPIC_NAME, controlTopicName);
+      return this;
+    }
+
+    public Builder withCommitIntervalSecs(Long commitIntervalSecs) {
+      connectConfigs.setValue(COMMIT_INTERVAL_SECS, String.valueOf(commitIntervalSecs));
+      return this;
+    }
+
+    public Builder withCoordinatorWriteTimeoutSecs(Long coordinatorWriteTimeoutSecs) {
+      connectConfigs.setValue(COORDINATOR_WRITE_TIMEOUT_SECS, String.valueOf(coordinatorWriteTimeoutSecs));
+      return this;
+    }
+
+    // Kafka connect task are passed with props with type Map<>
+    public Builder withProperties(Map<?, ?> properties) {
+      connectConfigs.getProps().putAll(properties);
+      return this;
+    }
+
+    public Builder withProperties(Properties properties) {
+      connectConfigs.getProps().putAll(properties);
+      return this;
+    }
+
+    protected void setDefaults() {
+      // Check for mandatory properties
+      connectConfigs.setDefaults(KafkaConnectConfigs.class.getName());
+    }
+
+    public KafkaConnectConfigs build() {
+      setDefaults();
+      // Build HudiConnectConfigs at the end
+      return new KafkaConnectConfigs(connectConfigs.getProps());
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
new file mode 100644
index 0000000..ad40ebc
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of Transaction service APIs used by
+ * {@link TransactionCoordinator}
+ * using {@link HoodieJavaWriteClient}.
+ */
+public class KafkaConnectTransactionServices implements ConnectTransactionServices {
+
+  private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
+  private static final String TABLE_FORMAT = "PARQUET";
+
+  private final Option<HoodieTableMetaClient> tableMetaClient;
+  private final Configuration hadoopConf;
+  private final FileSystem fs;
+  private final String tableBasePath;
+  private final String tableName;
+  private final HoodieEngineContext context;
+
+  private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
+
+  public KafkaConnectTransactionServices(
+      KafkaConnectConfigs connectConfigs) throws HoodieException {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(connectConfigs.getProps()).build();
+
+    tableBasePath = writeConfig.getBasePath();
+    tableName = writeConfig.getTableName();
+    hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
+    context = new HoodieJavaEngineContext(hadoopConf);
+    fs = FSUtils.getFs(tableBasePath, hadoopConf);
+
+    try {
+      KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
+          new TypedProperties(connectConfigs.getProps()));
+
+      String recordKeyFields = KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
+      String partitionColumns = KafkaConnectUtils.getPartitionColumns(keyGenerator,
+          new TypedProperties(connectConfigs.getProps()));
+
+      LOG.info(String.format("Setting record key %s and partitionfields %s for table %s",
+          recordKeyFields,
+          partitionColumns,
+          tableBasePath + tableName));
+
+      tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+          .setTableName(tableName)
+          .setPayloadClassName(HoodieAvroPayload.class.getName())
+          .setBaseFileFormat(TABLE_FORMAT)
+          .setRecordKeyFields(recordKeyFields)
+          .setPartitionFields(partitionColumns)
+          .setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass())
+          .initTable(hadoopConf, tableBasePath));
+
+      javaClient = new HoodieJavaWriteClient<>(context, writeConfig);
+    } catch (Exception exception) {
+      throw new HoodieException("Fatal error instantiating Hudi Transaction Services ", exception);
+    }
+  }
+
+  public String startCommit() {
+    String newCommitTime = javaClient.startCommit();
+    javaClient.transitionInflight(newCommitTime);
+    LOG.info("Starting Hudi commit " + newCommitTime);
+    return newCommitTime;
+  }
+
+  public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata),
+        HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap());
+    LOG.info("Ending Hudi commit " + commitTime);
+  }
+
+  public Map<String, String> fetchLatestExtraCommitMetadata() {
+    if (tableMetaClient.isPresent()) {
+      Option<HoodieCommitMetadata> metadata = KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get());
+      if (metadata.isPresent()) {
+        return metadata.get().getExtraMetadata();
+      } else {
+        LOG.info("Hoodie Extra Metadata from latest commit is absent");
+        return Collections.emptyMap();
+      }
+    }
+    throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent");
+  }
+}
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
new file mode 100644
index 0000000..9d007dd
--- /dev/null
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect.writers;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.schema.SchemaProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+
+/**
+ * Provides the Hudi Writer for the {@link org.apache.hudi.connect.transaction.TransactionParticipant}
+ * to write the incoming records to Hudi.
+ */
+public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteStatus> {
+
+  private static final Logger LOG = LogManager.getLogger(KafkaConnectWriterProvider.class);
+
+  private final KafkaConnectConfigs connectConfigs;
+  private final HoodieEngineContext context;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieJavaWriteClient<HoodieAvroPayload> hudiJavaClient;
+  private final KeyGenerator keyGenerator;
+  private final SchemaProvider schemaProvider;
+
+  public KafkaConnectWriterProvider(
+      KafkaConnectConfigs connectConfigs,
+      TopicPartition partition) throws HoodieException {
+    this.connectConfigs = connectConfigs;
+    Configuration hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
+
+    try {
+      this.schemaProvider = StringUtils.isNullOrEmpty(connectConfigs.getSchemaProviderClass()) ? null
+          : (SchemaProvider) ReflectionUtils.loadClass(connectConfigs.getSchemaProviderClass(),
+          new TypedProperties(connectConfigs.getProps()));
+
+      this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(
+          new TypedProperties(connectConfigs.getProps()));
+
+      // Create the write client to write some records in
+      writeConfig = HoodieWriteConfig.newBuilder()
+          .withProperties(connectConfigs.getProps())
+          .withFileIdPrefixProviderClassName(KafkaConnectFileIdPrefixProvider.class.getName())
+          .withProps(Collections.singletonMap(
+              KafkaConnectFileIdPrefixProvider.KAFKA_CONNECT_PARTITION_ID,
+              String.valueOf(partition)))
+          .withSchema(schemaProvider.getSourceSchema().toString())
+          .withAutoCommit(false)
+          .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+          .build();
+
+      context = new HoodieJavaEngineContext(hadoopConf);
+
+      hudiJavaClient = new HoodieJavaWriteClient<>(context, writeConfig);
+    } catch (Throwable e) {
+      throw new HoodieException("Fatal error instantiating Hudi Write Provider ", e);
+    }
+  }
+
+  public AbstractConnectWriter getWriter(String commitTime) {
+    return new BufferedConnectWriter(
+        context,
+        hudiJavaClient,
+        commitTime,
+        connectConfigs,
+        writeConfig,
+        keyGenerator,
+        schemaProvider);
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
new file mode 100644
index 0000000..21940ab
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.connect.transaction.ConnectTransactionCoordinator;
+import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.helper.MockConnectTransactionServices;
+import org.apache.hudi.helper.MockKafkaControlAgent;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class TestConnectTransactionCoordinator {
+
+  private static final String TOPIC_NAME = "kafka-connect-test-topic";
+  private static final int NUM_PARTITIONS = 4;
+  private static final int MAX_COMMIT_ROUNDS = 5;
+  private static final int TEST_TIMEOUT_SECS = 60;
+
+  private KafkaConnectConfigs configs;
+  private MockParticipant participant;
+  private MockKafkaControlAgent kafkaControlAgent;
+  private MockConnectTransactionServices transactionServices;
+  private CountDownLatch latch;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    transactionServices = new MockConnectTransactionServices();
+    configs = KafkaConnectConfigs.newBuilder()
+        .withCommitIntervalSecs(1L)
+        .withCoordinatorWriteTimeoutSecs(1L)
+        .build();
+    latch = new CountDownLatch(1);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = MockParticipant.TestScenarios.class)
+  public void testSingleCommitScenario(MockParticipant.TestScenarios scenario) throws InterruptedException {
+    kafkaControlAgent = new MockKafkaControlAgent();
+    participant = new MockParticipant(kafkaControlAgent, latch, scenario, MAX_COMMIT_ROUNDS);
+    participant.start();
+
+    // Test the coordinator using the mock participant
+    TransactionCoordinator coordinator = new ConnectTransactionCoordinator(
+        configs,
+        new TopicPartition(TOPIC_NAME, 0),
+        kafkaControlAgent,
+        transactionServices,
+        (bootstrapServers, topicName) -> NUM_PARTITIONS);
+    coordinator.start();
+
+    latch.await(TEST_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    if (latch.getCount() > 0) {
+      throw new HoodieException("Test timedout resulting in failure");
+    }
+    coordinator.stop();
+    participant.stop();
+  }
+
+  /**
+   * A mock Transaction Participant, that exercises all the test scenarios
+   * for the coordinator as mentioned in {@link TestScenarios}.
+   */
+  private static class MockParticipant implements TransactionParticipant {
+
+    private final MockKafkaControlAgent kafkaControlAgent;
+    private final TopicPartition partition;
+    private final CountDownLatch latch;
+    private final TestScenarios testScenario;
+    private final int maxNumberCommitRounds;
+    private final Map<Integer, Long> kafkaOffsetsCommitted;
+
+    private ControlEvent.MsgType expectedMsgType;
+    private int numberCommitRounds;
+
+    public MockParticipant(MockKafkaControlAgent kafkaControlAgent,
+                           CountDownLatch latch,
+                           TestScenarios testScenario,
+                           int maxNumberCommitRounds) {
+      this.kafkaControlAgent = kafkaControlAgent;
+      this.latch = latch;
+      this.testScenario = testScenario;
+      this.maxNumberCommitRounds = maxNumberCommitRounds;
+      this.partition = new TopicPartition(TOPIC_NAME, (NUM_PARTITIONS - 1));
+      this.kafkaOffsetsCommitted = new HashMap<>();
+      expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+      numberCommitRounds = 0;
+    }
+
+    @Override
+    public void start() {
+      kafkaControlAgent.registerTransactionParticipant(this);
+    }
+
+    @Override
+    public void stop() {
+      kafkaControlAgent.deregisterTransactionParticipant(this);
+    }
+
+    @Override
+    public void buffer(SinkRecord record) {
+    }
+
+    @Override
+    public void processRecords() {
+    }
+
+    @Override
+    public TopicPartition getPartition() {
+      return partition;
+    }
+
+    @Override
+    public void processControlEvent(ControlEvent message) {
+      assertEquals(message.getSenderType(), ControlEvent.SenderType.COORDINATOR);
+      assertEquals(message.senderPartition().topic(), partition.topic());
+      testScenarios(message);
+    }
+
+    @Override
+    public long getLastKafkaCommittedOffset() {
+      return 0;
+    }
+
+    private void testScenarios(ControlEvent message) {
+      assertEquals(expectedMsgType, message.getMsgType());
+
+      switch (message.getMsgType()) {
+        case START_COMMIT:
+          expectedMsgType = ControlEvent.MsgType.END_COMMIT;
+          break;
+        case END_COMMIT:
+          assertEquals(kafkaOffsetsCommitted, message.getCoordinatorInfo().getGlobalKafkaCommitOffsets());
+          int numSuccessPartitions;
+          Map<Integer, Long> kafkaOffsets = new HashMap<>();
+          List<ControlEvent> controlEvents = new ArrayList<>();
+          // Prepare the WriteStatuses for all partitions
+          for (int i = 1; i <= NUM_PARTITIONS; i++) {
+            try {
+              long kafkaOffset = (long) (Math.random() * 10000);
+              kafkaOffsets.put(i, kafkaOffset);
+              ControlEvent event = successWriteStatus(
+                  message.getCommitTime(),
+                  new TopicPartition(TOPIC_NAME, i),
+                  kafkaOffset);
+              controlEvents.add(event);
+            } catch (Exception exception) {
+              throw new HoodieException("Fatal error sending control event to Coordinator");
+            }
+          }
+
+          switch (testScenario) {
+            case ALL_CONNECT_TASKS_SUCCESS:
+              numSuccessPartitions = NUM_PARTITIONS;
+              kafkaOffsetsCommitted.putAll(kafkaOffsets);
+              expectedMsgType = ControlEvent.MsgType.ACK_COMMIT;
+              break;
+            case SUBSET_CONNECT_TASKS_FAILED:
+              numSuccessPartitions = NUM_PARTITIONS / 2;
+              expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+              break;
+            default:
+              throw new HoodieException("Unknown test scenario " + testScenario);
+          }
+
+          // Send events based on test scenario
+          for (int i = 0; i < numSuccessPartitions; i++) {
+            kafkaControlAgent.publishMessage(controlEvents.get(i));
+          }
+          break;
+        case ACK_COMMIT:
+          if (numberCommitRounds >= maxNumberCommitRounds) {
+            latch.countDown();
+          }
+          expectedMsgType = ControlEvent.MsgType.START_COMMIT;
+          break;
+        default:
+          throw new HoodieException("Illegal control message type " + message.getMsgType());
+      }
+
+      if (message.getMsgType().equals(ControlEvent.MsgType.START_COMMIT)) {
+        if (numberCommitRounds >= maxNumberCommitRounds) {
+          latch.countDown();
+        }
+        numberCommitRounds++;
+        expectedMsgType = ControlEvent.MsgType.END_COMMIT;
+      }
+    }
+
+    public enum TestScenarios {
+      SUBSET_CONNECT_TASKS_FAILED,
+      ALL_CONNECT_TASKS_SUCCESS
+    }
+
+    private static ControlEvent successWriteStatus(String commitTime,
+                                                   TopicPartition partition,
+                                                   long kafkaOffset) throws Exception {
+      // send WS
+      WriteStatus writeStatus = new WriteStatus();
+      WriteStatus status = new WriteStatus(false, 1.0);
+      for (int i = 0; i < 1000; i++) {
+        status.markSuccess(mock(HoodieRecord.class), Option.empty());
+      }
+      return new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
+          ControlEvent.SenderType.PARTICIPANT,
+          commitTime,
+          partition)
+          .setParticipantInfo(new ControlEvent.ParticipantInfo(
+              Collections.singletonList(writeStatus),
+              kafkaOffset,
+              ControlEvent.OutcomeType.WRITE_SUCCESS))
+          .build();
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
new file mode 100644
index 0000000..900ba46
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.connect.kafka.KafkaControlAgent;
+import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
+import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.helper.MockKafkaControlAgent;
+import org.apache.hudi.helper.TestHudiWriterProvider;
+import org.apache.hudi.helper.TestKafkaConnect;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestConnectTransactionParticipant {
+
+  private static final String TOPIC_NAME = "kafka-connect-test-topic";
+  private static final int PARTITION_NUMBER = 4;
+
+  private ConnectTransactionParticipant participant;
+  private MockCoordinator coordinator;
+  private TopicPartition partition;
+  private KafkaConnectConfigs configs;
+  private KafkaControlAgent kafkaControlAgent;
+  private TestHudiWriterProvider testHudiWriterProvider;
+  private TestKafkaConnect testKafkaConnect;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
+    kafkaControlAgent = new MockKafkaControlAgent();
+    testKafkaConnect = new TestKafkaConnect(partition);
+    coordinator = new MockCoordinator(kafkaControlAgent);
+    coordinator.start();
+    configs = KafkaConnectConfigs.newBuilder()
+        .build();
+    initializeParticipant();
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = CoordinatorFailureTestScenarios.class)
+  public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
+    int expectedRecordsWritten = 0;
+    switch (testScenario) {
+      case REGULAR_SCENARIO:
+        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+        assertTrue(testKafkaConnect.isPaused());
+        break;
+      case COORDINATOR_FAILED_AFTER_START_COMMIT:
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        // Coordinator Failed
+        initializeCoordinator();
+        break;
+      case COORDINATOR_FAILED_AFTER_END_COMMIT:
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+        // Coordinator Failed
+        initializeCoordinator();
+        break;
+      default:
+        throw new HoodieException("Unknown test scenario " + testScenario);
+    }
+
+    // Regular Case or Coordinator Recovery Case
+    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+    expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+    assertTrue(testKafkaConnect.isResumed());
+    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+    testKafkaConnect.putRecordsToParticipant();
+    assertTrue(testKafkaConnect.isPaused());
+    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+    testKafkaConnect.putRecordsToParticipant();
+    assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+    // Ensure Coordinator and participant are in sync in the kafka offsets
+    assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+
+    participant.stop();
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = ParticipantFailureTestScenarios.class)
+  public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
+    int expectedRecordsWritten = 0;
+    switch (testScenario) {
+      case FAILURE_BEFORE_START_COMMIT:
+        testKafkaConnect.putRecordsToParticipant();
+        // Participant fails
+        initializeParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+        assertTrue(testKafkaConnect.isResumed());
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertTrue(testKafkaConnect.isPaused());
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+        // Ensure Coordinator and participant are in sync in the kafka offsets
+        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+        break;
+      case FAILURE_AFTER_START_COMMIT:
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        // Participant fails
+        initializeParticipant();
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertTrue(testKafkaConnect.isPaused());
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+        // Ensure Coordinator and participant are in sync in the kafka offsets
+        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+        break;
+      case FAILURE_AFTER_END_COMMIT:
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        // Participant fails
+        initializeParticipant();
+        testKafkaConnect.putRecordsToParticipant();
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertTrue(testKafkaConnect.isPaused());
+        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+        testKafkaConnect.putRecordsToParticipant();
+        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+        // Ensure Coordinator and participant are in sync in the kafka offsets
+        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+        break;
+      default:
+        throw new HoodieException("Unknown test scenario " + testScenario);
+    }
+  }
+
+  private void initializeParticipant() {
+    testHudiWriterProvider = new TestHudiWriterProvider();
+    participant = new ConnectTransactionParticipant(
+        partition,
+        kafkaControlAgent,
+        testKafkaConnect,
+        testHudiWriterProvider);
+    testKafkaConnect.setParticipant(participant);
+    participant.start();
+  }
+
+  private void initializeCoordinator() {
+    coordinator = new MockCoordinator(kafkaControlAgent);
+    coordinator.start();
+  }
+
+  private static class MockCoordinator implements TransactionCoordinator {
+
+    private static int currentCommitTime;
+
+    static {
+      currentCommitTime = 101;
+    }
+
+    private final KafkaControlAgent kafkaControlAgent;
+    private final TopicPartition partition;
+
+    private Option<ControlEvent> lastReceivedWriteStatusEvent;
+    private long committedKafkaOffset;
+
+    public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
+      this.kafkaControlAgent = kafkaControlAgent;
+      partition = new TopicPartition(TOPIC_NAME, 0);
+      lastReceivedWriteStatusEvent = Option.empty();
+      committedKafkaOffset = 0L;
+    }
+
+    public void sendEventFromCoordinator(
+        ControlEvent.MsgType type) {
+      try {
+        if (type.equals(ControlEvent.MsgType.START_COMMIT)) {
+          ++currentCommitTime;
+        }
+        kafkaControlAgent.publishMessage(new ControlEvent.Builder(
+            type,
+            ControlEvent.SenderType.COORDINATOR,
+            String.valueOf(currentCommitTime),
+            partition)
+            .setCoordinatorInfo(new ControlEvent.CoordinatorInfo(
+                Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset)))
+            .build());
+      } catch (Exception exception) {
+        throw new HoodieException("Fatal error sending control event to Participant");
+      }
+    }
+
+    public Option<ControlEvent> getLastReceivedWriteStatusEvent() {
+      return lastReceivedWriteStatusEvent;
+    }
+
+    public long getCommittedKafkaOffset() {
+      return committedKafkaOffset;
+    }
+
+    @Override
+    public void start() {
+      kafkaControlAgent.registerTransactionCoordinator(this);
+    }
+
+    @Override
+    public void stop() {
+      kafkaControlAgent.deregisterTransactionCoordinator(this);
+    }
+
+    @Override
+    public TopicPartition getPartition() {
+      return partition;
+    }
+
+    @Override
+    public void processControlEvent(ControlEvent message) {
+      if (message.getMsgType().equals(ControlEvent.MsgType.WRITE_STATUS)) {
+        lastReceivedWriteStatusEvent = Option.of(message);
+        assertTrue(message.getParticipantInfo().getKafkaCommitOffset() >= committedKafkaOffset);
+        committedKafkaOffset = message.getParticipantInfo().getKafkaCommitOffset();
+      }
+    }
+  }
+
+  private enum CoordinatorFailureTestScenarios {
+    REGULAR_SCENARIO,
+    COORDINATOR_FAILED_AFTER_START_COMMIT,
+    COORDINATOR_FAILED_AFTER_END_COMMIT,
+  }
+
+  private enum ParticipantFailureTestScenarios {
+    FAILURE_BEFORE_START_COMMIT,
+    FAILURE_AFTER_START_COMMIT,
+    FAILURE_AFTER_END_COMMIT,
+  }
+
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
new file mode 100644
index 0000000..6994c65
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockConnectTransactionServices.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.helper;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.connect.writers.ConnectTransactionServices;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Helper class for {@link ConnectTransactionServices} to generate
+ * a unique commit time for testing purposes.
+ */
+public class MockConnectTransactionServices implements ConnectTransactionServices {
+
+  private int commitTime;
+
+  public MockConnectTransactionServices() {
+    commitTime = 100;
+  }
+
+  @Override
+  public String startCommit() {
+    commitTime++;
+    return String.valueOf(commitTime);
+  }
+
+  @Override
+  public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    assertEquals(String.valueOf(this.commitTime), commitTime);
+  }
+
+  @Override
+  public Map<String, String> fetchLatestExtraCommitMetadata() {
+    return new HashMap<>();
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java
new file mode 100644
index 0000000..529cd75
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaControlAgent.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.helper;
+
+import org.apache.hudi.connect.kafka.KafkaControlAgent;
+import org.apache.hudi.connect.transaction.ControlEvent;
+import org.apache.hudi.connect.transaction.TransactionCoordinator;
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A mock Kafka Control Agent that supports the testing
+ * of a {@link TransactionCoordinator} with multiple
+ * instances of {@link TransactionParticipant}.
+ */
+public class MockKafkaControlAgent implements KafkaControlAgent {
+
+  private final Map<String, TransactionCoordinator> coordinators;
+  private final Map<String, List<TransactionParticipant>> participants;
+
+  public MockKafkaControlAgent() {
+    coordinators = new HashMap<>();
+    participants = new HashMap<>();
+  }
+
+  @Override
+  public void registerTransactionCoordinator(TransactionCoordinator coordinator) {
+    coordinators.put(coordinator.getPartition().topic(), coordinator);
+  }
+
+  @Override
+  public void registerTransactionParticipant(TransactionParticipant participant) {
+    if (!participants.containsKey(participant.getPartition().topic())) {
+      participants.put(participant.getPartition().topic(), new ArrayList<>());
+    }
+    participants.get(participant.getPartition().topic()).add(participant);
+  }
+
+  @Override
+  public void deregisterTransactionCoordinator(TransactionCoordinator coordinator) {
+    coordinators.remove(coordinator.getPartition().topic());
+  }
+
+  @Override
+  public void deregisterTransactionParticipant(TransactionParticipant worker) {
+    if (participants.containsKey(worker.getPartition().topic())) {
+      participants.get(worker.getPartition().topic()).remove(worker);
+    }
+  }
+
+  @Override
+  public void publishMessage(ControlEvent message) {
+    try {
+      String topic = message.senderPartition().topic();
+      if (message.getSenderType().equals(ControlEvent.SenderType.COORDINATOR)) {
+        for (TransactionParticipant participant : participants.get(topic)) {
+          participant.processControlEvent(message);
+        }
+      } else {
+        coordinators.get(topic).processControlEvent(message);
+      }
+    } catch (Exception exception) {
+      throw new HoodieException("Fatal error trying to relay Kafka Control Messages for Testing.");
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
new file mode 100644
index 0000000..45c9b03
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.helper;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.connect.writers.ConnectWriter;
+import org.apache.hudi.connect.writers.ConnectWriterProvider;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.List;
+
+/**
+ * Helper class the provides a Hudi writer and
+ * maintains stats that are used for test validation.
+ */
+public class TestHudiWriterProvider implements ConnectWriterProvider<WriteStatus>  {
+
+  private TestHudiWriter currentWriter;
+
+  public TestHudiWriterProvider() {
+  }
+
+  public int getLatestNumberWrites() {
+    return (currentWriter != null) ? currentWriter.numberRecords : 0;
+  }
+
+  public boolean isClosed() {
+    return currentWriter == null || currentWriter.isClosed;
+  }
+
+  @Override
+  public ConnectWriter<WriteStatus> getWriter(String commitTime) {
+    currentWriter = new TestHudiWriter();
+    return currentWriter;
+  }
+
+  private static class TestHudiWriter implements ConnectWriter<WriteStatus> {
+
+    private int numberRecords;
+    private boolean isClosed;
+
+    public TestHudiWriter() {
+      this.numberRecords = 0;
+      this.isClosed = false;
+    }
+
+    public int getNumberRecords() {
+      return numberRecords;
+    }
+
+    public boolean isClosed() {
+      return isClosed;
+    }
+
+    @Override
+    public void writeRecord(SinkRecord record) {
+      numberRecords++;
+    }
+
+    @Override
+    public List<WriteStatus> close() {
+      isClosed = false;
+      return null;
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java
new file mode 100644
index 0000000..9530809
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.helper;
+
+import org.apache.hudi.connect.transaction.TransactionParticipant;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Helper class that emulates the Kafka Connect f/w and additionally
+ * implements {@link SinkTaskContext} for testing purposes.
+ */
+public class TestKafkaConnect implements SinkTaskContext {
+
+  private static final int NUM_RECORDS_BATCH = 5;
+  private final TopicPartition testPartition;
+
+  private TransactionParticipant participant;
+  private long currentKafkaOffset;
+  private boolean isPaused;
+
+  public TestKafkaConnect(TopicPartition testPartition) {
+    this.testPartition = testPartition;
+    isPaused = false;
+    currentKafkaOffset = 0L;
+  }
+
+  public void setParticipant(TransactionParticipant participant) {
+    this.participant = participant;
+  }
+
+  public boolean isPaused() {
+    return isPaused;
+  }
+
+  public boolean isResumed() {
+    return !isPaused;
+  }
+
+  public int putRecordsToParticipant() {
+    for (int i = 1; i <= NUM_RECORDS_BATCH; i++) {
+      participant.buffer(getNextKafkaRecord());
+    }
+    participant.processRecords();
+    return NUM_RECORDS_BATCH;
+  }
+
+  public SinkRecord getNextKafkaRecord() {
+    return new SinkRecord(testPartition.topic(),
+        testPartition.partition(),
+        Schema.OPTIONAL_BYTES_SCHEMA,
+        ("key-" + currentKafkaOffset).getBytes(),
+        Schema.OPTIONAL_BYTES_SCHEMA,
+        "value".getBytes(), currentKafkaOffset++);
+  }
+
+  public long getCurrentKafkaOffset() {
+    return currentKafkaOffset;
+  }
+
+  @Override
+  public void pause(TopicPartition... partitions) {
+    if (Arrays.stream(partitions).allMatch(testPartition::equals)) {
+      isPaused = true;
+    }
+  }
+
+  @Override
+  public void resume(TopicPartition... partitions) {
+    if (Arrays.stream(partitions).allMatch(testPartition::equals)) {
+      isPaused = false;
+    }
+  }
+
+  @Override
+  public void offset(Map<TopicPartition, Long> offsets) {
+    for (TopicPartition tp : offsets.keySet()) {
+      if (tp.equals(testPartition)) {
+        currentKafkaOffset = offsets.get(tp);
+      }
+    }
+  }
+
+  @Override
+  public void offset(TopicPartition tp, long offset) {
+    if (tp.equals(testPartition)) {
+      currentKafkaOffset = offset;
+    }
+  }
+
+  @Override
+  public Map<String, String> configs() {
+    return null;
+  }
+
+  @Override
+  public void timeout(long timeoutMs) {
+
+  }
+
+  @Override
+  public Set<TopicPartition> assignment() {
+    return null;
+  }
+
+  @Override
+  public void requestCommit() {
+
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
new file mode 100644
index 0000000..3ca64c3
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.writers;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.connect.writers.AbstractConnectWriter;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.schema.SchemaProvider;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestAbstractConnectWriter {
+
+  private static final String TOPIC_NAME = "kafka-connect-test-topic";
+  private static final int PARTITION_NUMBER = 4;
+  private static final int NUM_RECORDS = 10;
+  private static final int RECORD_KEY_INDEX = 0;
+
+  private KafkaConnectConfigs configs;
+  private TestKeyGenerator keyGenerator;
+  private SchemaProvider schemaProvider;
+  private long currentKafkaOffset;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    keyGenerator = new TestKeyGenerator(new TypedProperties());
+    schemaProvider = new TestSchemaProvider();
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = TestInputFormats.class)
+  public void testAbstractWriterForAllFormats(TestInputFormats inputFormats) throws Exception {
+    Schema schema = schemaProvider.getSourceSchema();
+    List<?> inputRecords;
+    List<HoodieRecord> expectedRecords;
+
+    String formatConverter;
+    switch (inputFormats) {
+      case JSON_STRING:
+        formatConverter = AbstractConnectWriter.KAFKA_STRING_CONVERTER;
+        GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(schema, schema);
+        inputRecords = SchemaTestUtil.generateTestJsonRecords(0, NUM_RECORDS);
+        expectedRecords = ((List<String>) inputRecords).stream().map(s -> {
+          try {
+            return HoodieAvroUtils.rewriteRecord((GenericRecord) reader.read(null, DecoderFactory.get().jsonDecoder(schema, s)),
+                schema);
+          } catch (IOException exception) {
+            throw new HoodieException("Error converting JSON records to AVRO");
+          }
+        }).map(p -> convertToHoodieRecords(p, p.get(RECORD_KEY_INDEX).toString(), "000/00/00")).collect(Collectors.toList());
+        break;
+      case AVRO:
+        formatConverter = AbstractConnectWriter.KAFKA_AVRO_CONVERTER;
+        inputRecords = SchemaTestUtil.generateTestRecords(0, NUM_RECORDS);
+        expectedRecords = inputRecords.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema))
+            .map(p -> convertToHoodieRecords(p, p.get(RECORD_KEY_INDEX).toString(), "000/00/00")).collect(Collectors.toList());
+        break;
+      default:
+        throw new HoodieException("Unknown test scenario " + inputFormats);
+    }
+
+    configs = KafkaConnectConfigs.newBuilder()
+        .withProperties(
+            Collections.singletonMap(KafkaConnectConfigs.KAFKA_VALUE_CONVERTER, formatConverter))
+        .build();
+    AbstractHudiConnectWriterTestWrapper writer = new AbstractHudiConnectWriterTestWrapper(
+        configs,
+        keyGenerator,
+        schemaProvider);
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      writer.writeRecord(getNextKafkaRecord(inputRecords.get(i)));
+    }
+
+    validateRecords(writer.getWrittenRecords(), expectedRecords);
+  }
+
+  private static void validateRecords(List<HoodieRecord> actualRecords, List<HoodieRecord> expectedRecords) {
+    assertEquals(actualRecords.size(), expectedRecords.size());
+
+    actualRecords.sort(Comparator.comparing(HoodieRecord::getRecordKey));
+    expectedRecords.sort(Comparator.comparing(HoodieRecord::getRecordKey));
+
+    // iterate through the elements and compare them one by one using
+    // the provided comparator.
+    Iterator<HoodieRecord> it1 = actualRecords.iterator();
+    Iterator<HoodieRecord> it2 = expectedRecords.iterator();
+    while (it1.hasNext()) {
+      HoodieRecord t1 = it1.next();
+      HoodieRecord t2 = it2.next();
+      assertEquals(t1.getRecordKey(), t2.getRecordKey());
+    }
+  }
+
+  private SinkRecord getNextKafkaRecord(Object record) {
+    return new SinkRecord(TOPIC_NAME, PARTITION_NUMBER,
+        org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA,
+        ("key-" + currentKafkaOffset).getBytes(),
+        org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA,
+        record, currentKafkaOffset++);
+  }
+
+  private static class AbstractHudiConnectWriterTestWrapper extends AbstractConnectWriter {
+
+    private List<HoodieRecord> writtenRecords;
+
+    public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
+      super(connectConfigs, keyGenerator, schemaProvider);
+      writtenRecords = new ArrayList<>();
+    }
+
+    public List<HoodieRecord> getWrittenRecords() {
+      return writtenRecords;
+    }
+
+    @Override
+    protected void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+      writtenRecords.add(record);
+    }
+
+    @Override
+    protected List<WriteStatus> flushHudiRecords() {
+      return null;
+    }
+  }
+
+  private static HoodieRecord convertToHoodieRecords(IndexedRecord iRecord, String key, String partitionPath) {
+    return new HoodieRecord<>(new HoodieKey(key, partitionPath),
+        new HoodieAvroPayload(Option.of((GenericRecord) iRecord)));
+  }
+
+  private enum TestInputFormats {
+    AVRO,
+    JSON_STRING
+  }
+
+  static class TestKeyGenerator extends KeyGenerator {
+
+    protected TestKeyGenerator(TypedProperties config) {
+      super(config);
+    }
+
+    @Override
+    public HoodieKey getKey(GenericRecord record) {
+      return new HoodieKey(record.get(RECORD_KEY_INDEX).toString(), "000/00/00");
+    }
+  }
+
+  static class TestSchemaProvider extends SchemaProvider {
+
+    @Override
+    public Schema getSourceSchema() {
+      try {
+        return SchemaTestUtil.getSimpleSchema();
+      } catch (IOException exception) {
+        throw new HoodieException("Fatal error parsing schema", exception);
+      }
+    }
+  }
+}
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
new file mode 100644
index 0000000..d1813e1
--- /dev/null
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.writers;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.connect.writers.BufferedConnectWriter;
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.apache.hudi.schema.SchemaProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+public class TestBufferedConnectWriter {
+
+  private static final int NUM_RECORDS = 10;
+  private static final String COMMIT_TIME = "101";
+
+  private HoodieJavaWriteClient mockHoodieJavaWriteClient;
+  private HoodieJavaEngineContext javaEngineContext;
+  private KafkaConnectConfigs configs;
+  private HoodieWriteConfig writeConfig;
+  private SchemaProvider schemaProvider;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    mockHoodieJavaWriteClient = mock(HoodieJavaWriteClient.class);
+    Configuration hadoopConf = new Configuration();
+    javaEngineContext = new HoodieJavaEngineContext(hadoopConf);
+    configs = KafkaConnectConfigs.newBuilder().build();
+    schemaProvider = new TestAbstractConnectWriter.TestSchemaProvider();
+    writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withSchema(schemaProvider.getSourceSchema().toString())
+        .build();
+  }
+
+  @Test
+  public void testSimpleWriteAndFlush() throws Exception {
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+    List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS);
+
+    BufferedConnectWriter writer = new BufferedConnectWriter(
+        javaEngineContext,
+        mockHoodieJavaWriteClient,
+        COMMIT_TIME,
+        configs,
+        writeConfig,
+        null,
+        schemaProvider);
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      writer.writeHudiRecord(records.get(i));
+    }
+    Mockito.verify(mockHoodieJavaWriteClient, times(0))
+        .bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME), eq(Option.empty()));
+
+    writer.flushHudiRecords();
+    final ArgumentCaptor<List<HoodieRecord>> actualRecords = ArgumentCaptor.forClass(List.class);
+    Mockito.verify(mockHoodieJavaWriteClient, times(1))
+        .bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME), eq(Option.empty()));
+
+    actualRecords.getValue().sort(Comparator.comparing(HoodieRecord::getRecordKey));
+    records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
+
+    assertEquals(records, actualRecords.getValue());
+  }
+}
diff --git a/hudi-kafka-connect/src/test/resources/log4j-surefire.properties b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties
new file mode 100644
index 0000000..9ee04e1
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,32 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.hudi=DEBUG
+log4j.logger.org.apache.hadoop.hbase=ERROR
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
+
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 88d1d8c..53d68c3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -352,6 +352,7 @@ public class DeltaSync implements Serializable {
         }
       }
     } else {
+      // initialize the table for the first time.
       String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(cfg.tableType)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
index bcbdbf0..2410798 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
@@ -29,7 +29,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import java.io.Serializable;
 
 /**
- * Class to provide schema for reading data and also writing into a Hoodie table.
+ * Class to provide schema for reading data and also writing into a Hoodie table,
+ * used by deltastreamer (runs over Spark).
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
 public abstract class SchemaProvider implements Serializable {
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
new file mode 100644
index 0000000..14bc4e4
--- /dev/null
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -0,0 +1,186 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hudi</artifactId>
+        <groupId>org.apache.hudi</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hudi-kafka-connect-bundle</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <checkstyle.skip>true</checkstyle.skip>
+        <main.basedir>${project.parent.basedir}</main.basedir>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createSourcesJar>${shadeSources}</createSourcesJar>
+                            <dependencyReducedPomLocation>${project.build.directory}/dependency-reduced-pom.xml
+                            </dependencyReducedPomLocation>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <addHeader>true</addHeader>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                                    <resource>META-INF/LICENSE</resource>
+                                    <file>target/classes/META-INF/LICENSE</file>
+                                </transformer>
+                            </transformers>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>com.amazonaws.*</exclude>
+                                    <exclude>org.apache.zookeeper:zookeeper</exclude>
+                                    <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
+                                    <exclude>commons-httpclient:commons-httpclient</exclude>
+                                    <exclude>org.apache.htrace:htrace-core</exclude>
+                                    <exclude>org.jamon:jamon-runtime</exclude>
+                                    <exclude>jdk.tools:jdk.tools</exclude>
+                                    <exclude>junit:junit</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>META-INF/services/javax.*</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+            <resource>
+                <directory>src/test/resources</directory>
+            </resource>
+        </resources>
+    </build>
+
+    <dependencies>
+        <!-- Hudi -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-kafka-connect</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-utilities_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Avro/ Parquet -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <!-- Hadoop -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+</project>
+
diff --git a/packaging/hudi-kafka-connect-bundle/src/main/java/org/apache/hudi/kafka/connect/bundle/Main.java b/packaging/hudi-kafka-connect-bundle/src/main/java/org/apache/hudi/kafka/connect/bundle/Main.java
new file mode 100644
index 0000000..3b86e54
--- /dev/null
+++ b/packaging/hudi-kafka-connect-bundle/src/main/java/org/apache/hudi/kafka/connect/bundle/Main.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.kafka.connect.bundle;
+
+import org.apache.hudi.common.util.ReflectionUtils;
+
+/**
+ * A simple main class to dump all classes loaded in current classpath
+ *
+ * This is a workaround for generating sources and javadoc jars for packaging modules. The maven plugins for generating
+ * javadoc and sources plugins do not generate corresponding jars if there are no source files.
+ *
+ * This class does not have anything to do with Hudi but is there to keep mvn javadocs/source plugin happy.
+ */
+public class Main {
+
+  public static void main(String[] args) {
+    ReflectionUtils.getTopLevelClassesInClasspath(Main.class).forEach(System.out::println);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 65e391a..a1beac0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,9 @@
     <module>packaging/hudi-integ-test-bundle</module>
     <module>hudi-examples</module>
     <module>hudi-flink</module>
+    <module>hudi-kafka-connect</module>
     <module>packaging/hudi-flink-bundle</module>
+    <module>packaging/hudi-kafka-connect-bundle</module>
   </modules>
 
   <licenses>