You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/09/14 14:15:12 UTC

[hudi] branch master updated: [HUDI-2428] Fix protocol and other issues after stress testing Hudi Kafka Connect (#3656)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9735f4b  [HUDI-2428] Fix protocol and other issues after stress testing Hudi Kafka Connect (#3656)
9735f4b is described below

commit 9735f4b8efaa3d333e496c1159bd9ae0b222519a
Author: rmahindra123 <76...@users.noreply.github.com>
AuthorDate: Tue Sep 14 07:14:58 2021 -0700

    [HUDI-2428] Fix protocol and other issues after stress testing Hudi Kafka Connect (#3656)
    
    * Fixes based on tests and some improvements
    * Fix the issues after running stress tests
    * Fixing checkstyle issues and updating README
    
    Co-authored-by: Rajesh Mahindra <rm...@Rajeshs-MacBook-Pro.local>
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../apache/hudi/schema/SchemaRegistryProvider.java | 126 ++++++++++++++
 hudi-kafka-connect/README.md                       |  65 +++++--
 .../{configs => demo}/config-sink.json             |   9 +-
 .../connect-distributed.properties                 |   2 +-
 hudi-kafka-connect/demo/setupKafka.sh              | 131 +++++++++++++++
 hudi-kafka-connect/scripts/raw.json                |   5 -
 .../scripts/runKafkaTrafficGenerator.sh            |  38 -----
 .../org/apache/hudi/connect/HoodieSinkTask.java    |  44 +++--
 .../transaction/ConnectTransactionCoordinator.java |   1 +
 .../transaction/ConnectTransactionParticipant.java |  24 +--
 .../hudi/connect/transaction/ControlEvent.java     |  17 +-
 .../transaction/TransactionParticipant.java        |   4 +-
 .../connect/writers/AbstractConnectWriter.java     |   4 +-
 .../connect/writers/BufferedConnectWriter.java     |   5 +-
 .../connect/TestConnectTransactionParticipant.java | 187 +++++++++++----------
 .../org/apache/hudi/helper/TestKafkaConnect.java   |   3 +-
 packaging/hudi-kafka-connect-bundle/pom.xml        |   2 +
 17 files changed, 475 insertions(+), 192 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java
new file mode 100644
index 0000000..c302c1d
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.schema;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Obtains latest schema from the Confluent/Kafka schema-registry.
+ * <p>
+ * https://github.com/confluentinc/schema-registry
+ */
+public class SchemaRegistryProvider extends SchemaProvider {
+
+  private final TypedProperties config;
+
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+
+    private static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+    private static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
+        "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+  }
+
+  /**
+   * The method takes the provided url {@code registryUrl} and gets the schema from the schema registry using that url.
+   * If the caller provides userInfo credentials in the url (e.g "https://foo:bar@schemaregistry.org") then the credentials
+   * are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization
+   * header.
+   *
+   * @param registryUrl
+   * @return the Schema in String form.
+   * @throws IOException
+   */
+  public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
+    URL registry;
+    HttpURLConnection connection;
+    Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl);
+    if (matcher.find()) {
+      String creds = matcher.group(1);
+      String urlWithoutCreds = registryUrl.replace(creds + "@", "");
+      registry = new URL(urlWithoutCreds);
+      connection = (HttpURLConnection) registry.openConnection();
+      setAuthorizationHeader(matcher.group(1), connection);
+    } else {
+      registry = new URL(registryUrl);
+      connection = (HttpURLConnection) registry.openConnection();
+    }
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode node = mapper.readTree(getStream(connection));
+    return node.get("schema").asText();
+  }
+
+  protected void setAuthorizationHeader(String creds, HttpURLConnection connection) {
+    String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8));
+    connection.setRequestProperty("Authorization", "Basic " + encodedAuth);
+  }
+
+  protected InputStream getStream(HttpURLConnection connection) throws IOException {
+    return connection.getInputStream();
+  }
+
+  public SchemaRegistryProvider(TypedProperties props) {
+    this.config = props;
+    StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+  }
+
+  private Schema getSchema(String registryUrl) throws IOException {
+    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  }
+
+  @Override
+  public Schema getSourceSchema() {
+    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    try {
+      return getSchema(registryUrl);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
+    }
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    try {
+      return getSchema(targetRegistryUrl);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe);
+    }
+  }
+}
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index fd0a5d0..b9ba966 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -15,32 +15,35 @@
 * See the License for the specific language governing permissions and
 -->
 
-# Quick Start guide for Kafka Connect Sink for Hudi
+# Quick Start (demo) guide for Kafka Connect Sink for Hudi
 
 This repo contains a sample project that can be used to start off your own source connector for Kafka Connect.
 
-## Building the connector
+## Building the Hudi Sink Connector
 
 The first thing you need to do to start using this connector is building it. In order to do that, you need to install the following dependencies:
 
 - [Java 1.8+](https://openjdk.java.net/)
 - [Apache Maven](https://maven.apache.org/)
+- Install [kcat](https://github.com/edenhill/kcat)
 
-After installing these dependencies, execute the following command:
+After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars,
+including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink.
 
 ```bash
 cd $HUDI_DIR
-mvn clean package
+mvn clean -DskipTests install
 ```
 
-## Incremental Builds
+Henceforth, incremental builds can be performed as follows. 
 
 ```bash
 mvn clean -pl hudi-kafka-connect install -DskipTests
 mvn clean -pl packaging/hudi-kafka-connect-bundle install
 ```
 
-## Put hudi connector in Kafka Connect classpath
+Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect
+classpath should be same as the one configured in the connector configuration file.
 
 ```bash
 cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/
@@ -52,43 +55,77 @@ After building the package, we need to install the Apache Kafka
 
 ### 1 - Starting the environment
 
-Start the ZK and Kafka:
+To try out the Connect Sink locally, set up a Kafka broker locally. Download the latest apache kafka from https://kafka.apache.org/downloads.
+Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools.
 
 ```bash
+export KAFKA_HOME=/path/to/kafka_install_dir
+cd $KAFKA_KAFKA_HOME
 ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
 ./bin/kafka-server-start.sh ./config/server.properties
 ```
 
 Wait until the kafka cluster is up and running.
 
-### 2 - Create the Hudi Control Topic for Coordination of the transactions
+### 2 - Set up the schema registry
 
-The control topic should only have `1` partition
+Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, 
+we use Confluent schema registry. Download the latest confluent schema registry code from https://github.com/confluentinc/schema-registry
+and start the schema registry service.
 
 ```bash
+cd $CONFLUENT_DIR
+./bin/schema-registry-start etc/schema-registry/schema-registry.properties
+```
+
+### 3 - Create the Hudi Control Topic for Coordination of the transactions
+
+The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks.
+
+```bash
+cd $KAFKA_HOME
 ./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
 ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
 ```
 
-### 3 - Create the Hudi Topic for the Sink and insert data into the topic
+### 4 - Create the Hudi Topic for the Sink and insert data into the topic
 
 Open a terminal to execute the following command:
 
 ```bash
-bash runKafkaTrafficGenerator.sh <total_messages>
+cd $HUDI_DIR/demo/
+bash setupKafka.sh -n <total_kafka_messages>
 ```
 
 ### 4 - Run the Sink connector worker (multiple workers can be run)
 
-Open a terminal to execute the following command:
+The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) 
+that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with 
+default properties to start a Hudi connector. 
+
+Note that if multiple workers need to be run, the webserver needs to be reconfigured for subsequent workers to ensure
+successful running of the workers.
 
 ```bash
-./bin/connect-distributed.sh ../hudi-kafka-connect/configs/connect-distributed.properties
+cd $KAFKA_HOME
+./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
 ```
 
 ### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure)
 
+Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following 
+curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, 
+that can be changed based on the desired properties.
+
 ```bash
 curl -X DELETE http://localhost:8083/connectors/hudi-sink
-curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/configs/config-sink.json http://localhost:8083/connectors
+curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors
+```
+
+Now, you should see that the connector is created and tasks are running.
+
+```bash
+curl -X GET -H "Content-Type:application/json"  http://localhost:8083/connectors
+["hudi-sink"]
+curl -X GET -H "Content-Type:application/json"  http://localhost:8083/connectors/hudi-sink/status | jq
 ```
diff --git a/hudi-kafka-connect/configs/config-sink.json b/hudi-kafka-connect/demo/config-sink.json
similarity index 56%
rename from hudi-kafka-connect/configs/config-sink.json
rename to hudi-kafka-connect/demo/config-sink.json
index 4e94bf5..75e6d84 100644
--- a/hudi-kafka-connect/configs/config-sink.json
+++ b/hudi-kafka-connect/demo/config-sink.json
@@ -9,11 +9,10 @@
 		"value.converter.schemas.enable": "false",
 		"topics": "hudi-test-topic",
 		"hoodie.table.name": "hudi-test-topic",
-		"hoodie.base.path": "file:///tmp/hoodie/sample-table",
+		"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
 		"hoodie.datasource.write.recordkey.field": "volume",
-		"hoodie.datasource.write.partitionpath.field": "year",
-		"hoodie.schemaprovider.class": "org.apache.hudi.schema.FilebasedSchemaProvider",
-		"hoodie.deltastreamer.schemaprovider.source.schema.file": "file:///tmp/hoodie/schema.avsc",
-		"hoodie.deltastreamer.schemaprovider.target.schema.file": "file:///tmp/hoodie/schema.avsc"
+		"hoodie.datasource.write.partitionpath.field": "date",
+		"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
+		"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest"
     }
 }
diff --git a/hudi-kafka-connect/configs/connect-distributed.properties b/hudi-kafka-connect/demo/connect-distributed.properties
similarity index 94%
rename from hudi-kafka-connect/configs/connect-distributed.properties
rename to hudi-kafka-connect/demo/connect-distributed.properties
index d7d453c..9e3cec1 100644
--- a/hudi-kafka-connect/configs/connect-distributed.properties
+++ b/hudi-kafka-connect/demo/connect-distributed.properties
@@ -30,4 +30,4 @@ status.storage.replication.factor=1
 
 offset.flush.interval.ms=60000
 listeners=HTTP://:8083
-plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
+plugin.path=/usr/local/share/java
diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh
new file mode 100644
index 0000000..f2c1735
--- /dev/null
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+#!/bin/bash
+
+## Directories
+HOME_DIR=~
+HUDI_DIR=${HOME_DIR}/hudi
+KAFKA_HOME=${HOME_DIR}/kafka
+
+#########################
+# The command line help #
+#########################
+usage() {
+    echo "Usage: $0"
+    echo "   -n |--num-kafka-records, (required) number of kafka records to generate"
+    echo "   -f |--raw-file, (optional) raw file for the kafka records"
+    echo "   -k |--kafka-topic, (optional) Topic name for Kafka"
+    echo "   -m |--num-kafka-partitions, (optional) number of kafka partitions"
+    echo "   -r |--record-key, (optional) field to use as record key"
+    echo "   -l |--num-hudi-partitions, (optional) number of hudi partitions"
+    echo "   -p |--partition-key, (optional) field to use as partition"
+    echo "   -s |--schema-file, (optional) path of the file containing the schema of the records"
+    exit 1
+}
+
+case "$1" in
+   --help)
+       usage
+       exit 0
+       ;;
+esac
+
+if [ $# -lt 1 ]; then
+    echo "Illegal number of parameters"
+    usage
+    exit 0
+fi
+
+## defaults
+rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json
+kafkaTopicName=hudi-test-topic
+numKafkaPartitions=4
+recordKey=volume
+numHudiPartitions=5
+partitionField=date
+schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
+
+while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
+  case $opt in
+    n) num_records="$OPTARG"
+    printf "Argument num-kafka-records is %s\n" "$num_records"
+    ;;
+    k) rawDataFile="$OPTARG"
+    printf "Argument raw-file is %s\n" "$rawDataFile"
+    ;;
+    f) kafkaTopicName="$OPTARG"
+    printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
+    ;;
+    m) numKafkaPartitions="$OPTARG"
+    printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions"
+    ;;
+    r) recordKey="$OPTARG"
+    printf "Argument record-key is %s\n" "$recordKey"
+    ;;
+    l) numHudiPartitions="$OPTARG"
+    printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
+    ;;
+    p) partitionField="$OPTARG"
+    printf "Argument partition-key is %s\n" "$partitionField"
+    ;;
+    p) schemaFile="$OPTARG"
+    printf "Argument schema-file is %s\n" "$schemaFile"
+    ;;
+    -) echo "Invalid option -$OPTARG" >&2
+    ;;
+esac
+done
+
+# First delete the existing topic
+$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092
+
+# Create the topic with 4 partitions
+$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092
+
+
+# Setup the schema registry
+export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring`
+curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions
+curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest
+
+
+# Generate kafka messages from raw records
+# Each records with unique keys and generate equal messages across each hudi partition
+partitions={}
+for ((i=0; i<${numHudiPartitions}; i++))
+do
+    partitions[$i]="partition-"$i;
+done
+
+for ((recordValue=0; recordValue<=${num_records}; ))
+do 
+    while IFS= read line 
+    do
+        for partitionValue in "${partitions[@]}"
+        do
+            echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic;
+            ((recordValue++));
+            if [ $recordValue -gt ${num_records} ]; then
+                exit 0
+            fi
+        done
+        
+        if [ $(( $recordValue % 1000 )) -eq 0 ]
+            then sleep 1
+        fi
+    done < "$rawDataFile"
+done 
diff --git a/hudi-kafka-connect/scripts/raw.json b/hudi-kafka-connect/scripts/raw.json
deleted file mode 100644
index aa2cc70..0000000
--- a/hudi-kafka-connect/scripts/raw.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{"volume": 0, "symbol": "TPNL", "ts": "2017-08-31 09:30:00", "month": "08", "high": 6.37, "low": 1.37, "key": "TPNL_2017-08-31 09", "year": 2017, "date": "2017/08/31", "close": 4.44, "open": 1.37, "day": "31"}
-{"volume": 0, "symbol": "SPOT", "ts": "2018-08-31 09:30:00", "month": "08", "high": 1.87, "low": 0.37, "key": "TPNL_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 1.44, "open": 1.77, "day": "31"}
-{"volume": 0, "symbol": "GOOG", "ts": "2019-08-31 09:30:00", "month": "08", "high": 2.1, "low": 1.7, "key": "TPNL_2019-08-31 09", "year": 2019, "date": "2019/08/31", "close": 1.94, "open": 2.0, "day": "31"}
-{"volume": 0, "symbol": "MSFT", "ts": "2020-08-31 09:30:00", "month": "08", "high": 3.33, "low": 0.87, "key": "TPNL_2020-08-31 09", "year": 2020, "date": "2020/08/31", "close": 3.33, "open": 3.1, "day": "31"}
-{"volume": 0, "symbol": "APPL", "ts": "2021-08-31 09:30:00", "month": "08", "high": 3.17, "low": 2.37, "key": "TPNL_2021-08-31 09", "year": 2021, "date": "2021/08/31", "close": 2.66, "open": 3.1, "day": "31"}
diff --git a/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh b/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh
deleted file mode 100644
index cff4140..0000000
--- a/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-#!/bin/bash
-
-# First delete the existing topic
-$KAFKA_HOME/bin/kafka-topics.sh --delete --topic hudi-test-topic --bootstrap-server localhost:9092
-
-# Create the topic with 4 partitions
-$KAFKA_HOME/bin/kafka-topics.sh --create --topic hudi-test-topic --partitions 4 --replication-factor 1 --bootstrap-server localhost:9092
-
-# Generate kafka messages from raw records
-inputFile="raw.json"
-# Generate the records with unique keys
-for ((recordKey=0; recordKey<=$1;  ))
-do 
-	while IFS= read line 
-	do
-		echo $line |  jq --argjson recordKey $recordKey -c '.volume = $recordKey' | kcat -P -b localhost:9092 -t hudi-test-topic
-		((recordKey++))
-		if [ $(( $recordKey % 1000 )) -eq 0 ]
-			then sleep 1
-		fi
-	done < "$inputFile"
-done 
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
index c7dde9a..a937a8b 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
@@ -30,11 +30,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -52,7 +54,7 @@ public class HoodieSinkTask extends SinkTask {
   private static final int COORDINATOR_KAFKA_PARTITION = 0;
 
   private final Map<TopicPartition, TransactionCoordinator> transactionCoordinators;
-  private final Map<TopicPartition, TransactionParticipant> hudiTransactionParticipants;
+  private final Map<TopicPartition, TransactionParticipant> transactionParticipants;
   private KafkaConnectControlAgent controlKafkaClient;
   private KafkaConnectConfigs connectConfigs;
 
@@ -60,8 +62,8 @@ public class HoodieSinkTask extends SinkTask {
   private String connectorName;
 
   public HoodieSinkTask() {
-    transactionCoordinators = new HashMap();
-    hudiTransactionParticipants = new HashMap<>();
+    transactionCoordinators = new HashMap<>();
+    transactionParticipants = new HashMap<>();
   }
 
   @Override
@@ -80,7 +82,6 @@ public class HoodieSinkTask extends SinkTask {
       controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
           connectConfigs.getBootstrapServers(),
           connectConfigs.getControlTopicName());
-      bootstrap(context.assignment());
     } catch (ConfigException e) {
       throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
     } catch (ConnectException e) {
@@ -98,11 +99,25 @@ public class HoodieSinkTask extends SinkTask {
       String topic = record.topic();
       int partition = record.kafkaPartition();
       TopicPartition tp = new TopicPartition(topic, partition);
-      hudiTransactionParticipants.get(tp).buffer(record);
+
+      TransactionParticipant transactionParticipant = transactionParticipants.get(tp);
+      if (transactionParticipant != null) {
+        transactionParticipant.buffer(record);
+      }
     }
 
     for (TopicPartition partition : context.assignment()) {
-      hudiTransactionParticipants.get(partition).processRecords();
+      if (transactionParticipants.get(partition) == null) {
+        throw new RetriableException("TransactionParticipant should be created for each assigned partition, "
+            + "but has not been created for the topic/partition: " + partition.topic() + ":" + partition.partition());
+      }
+      try {
+        transactionParticipants.get(partition).processRecords();
+      } catch (IOException exception) {
+        throw new RetriableException("Intermittent write errors for Hudi "
+            + " for the topic/partition: " + partition.topic() + ":" + partition.partition()
+            + " , ensuring kafka connect will retry ", exception);
+      }
     }
   }
 
@@ -123,12 +138,9 @@ public class HoodieSinkTask extends SinkTask {
     // committed to Hudi.
     Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
     for (TopicPartition partition : context.assignment()) {
-      TransactionParticipant worker = hudiTransactionParticipants.get(partition);
-      if (worker != null) {
-        worker.processRecords();
-        if (worker.getLastKafkaCommittedOffset() >= 0) {
-          result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
-        }
+      TransactionParticipant worker = transactionParticipants.get(partition);
+      if (worker != null && worker.getLastKafkaCommittedOffset() >= 0) {
+        result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset()));
       }
     }
     return result;
@@ -158,7 +170,7 @@ public class HoodieSinkTask extends SinkTask {
           transactionCoordinators.remove(partition);
         }
       }
-      TransactionParticipant worker = hudiTransactionParticipants.remove(partition);
+      TransactionParticipant worker = transactionParticipants.remove(partition);
       if (worker != null) {
         try {
           LOG.debug("Closing data writer due to task start failure.");
@@ -185,7 +197,7 @@ public class HoodieSinkTask extends SinkTask {
           transactionCoordinators.put(partition, coordinator);
         }
         ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context);
-        hudiTransactionParticipants.put(partition, worker);
+        transactionParticipants.put(partition, worker);
         worker.start();
       } catch (HoodieException exception) {
         LOG.error(String.format("Fatal error initializing task %s for partition %s", taskId, partition.partition()), exception);
@@ -195,7 +207,7 @@ public class HoodieSinkTask extends SinkTask {
 
   private void cleanup() {
     for (TopicPartition partition : context.assignment()) {
-      TransactionParticipant worker = hudiTransactionParticipants.get(partition);
+      TransactionParticipant worker = transactionParticipants.get(partition);
       if (worker != null) {
         try {
           LOG.debug("Closing data writer due to task start failure.");
@@ -205,7 +217,7 @@ public class HoodieSinkTask extends SinkTask {
         }
       }
     }
-    hudiTransactionParticipants.clear();
+    transactionParticipants.clear();
     transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop());
     transactionCoordinators.clear();
   }
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index 13291c8..73a30c6 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -131,6 +131,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru
   @Override
   public void stop() {
     kafkaControlClient.deregisterTransactionCoordinator(this);
+    scheduler.shutdownNow();
     hasStarted.set(false);
     if (executorService != null) {
       boolean terminated = false;
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
index fe1996e..c395071 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
@@ -32,7 +32,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -111,7 +110,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
   }
 
   @Override
-  public void processRecords() {
+  public void processRecords() throws IOException {
     while (!controlEvents.isEmpty()) {
       ControlEvent message = controlEvents.poll();
       switch (message.getMsgType()) {
@@ -153,7 +152,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
     }
   }
 
-  private void handleEndCommit(ControlEvent message) {
+  private void handleEndCommit(ControlEvent message) throws IOException {
     if (ongoingTransactionInfo == null) {
       LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime()));
       return;
@@ -167,28 +166,23 @@ public class ConnectTransactionParticipant implements TransactionParticipant {
       return;
     }
 
+    context.pause(partition);
+    ongoingTransactionInfo.commitInitiated();
     // send Writer Status Message and wait for ACK_COMMIT in async fashion
     try {
-      context.pause(partition);
-      ongoingTransactionInfo.commitInitiated();
       //sendWriterStatus
-      List<WriteStatus> writeStatuses = new ArrayList<>();
-      try {
-        writeStatuses = ongoingTransactionInfo.getWriter().close();
-      } catch (IOException exception) {
-        LOG.warn("Error closing the Hudi Writer", exception);
-      }
-
-      ControlEvent writeStatus = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
+      List<WriteStatus> writeStatuses = ongoingTransactionInfo.getWriter().close();
+      ControlEvent writeStatusEvent = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS,
           ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition)
           .setParticipantInfo(new ControlEvent.ParticipantInfo(
               writeStatuses,
               ongoingTransactionInfo.getLastWrittenKafkaOffset(),
               ControlEvent.OutcomeType.WRITE_SUCCESS))
           .build();
-      kafkaControlAgent.publishMessage(writeStatus);
+      kafkaControlAgent.publishMessage(writeStatusEvent);
     } catch (Exception exception) {
-      LOG.warn(String.format("Error ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
+      LOG.error(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
+      throw new IOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception);
     }
   }
 
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
index 0930648..5a35e7a 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The events sent over the Kafka Control Topic between the
@@ -108,7 +109,9 @@ public class ControlEvent implements Serializable {
   @Override
   public String toString() {
     return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime,
-        Arrays.toString(senderPartition), coordinatorInfo.toString(), participantInfo.toString());
+        Arrays.toString(senderPartition),
+        (coordinatorInfo == null) ? "" : coordinatorInfo.toString(),
+        (participantInfo == null) ? "" : participantInfo.toString());
   }
 
   /**
@@ -163,6 +166,13 @@ public class ControlEvent implements Serializable {
     public Map<Integer, Long> getGlobalKafkaCommitOffsets() {
       return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets;
     }
+
+    @Override
+    public String toString() {
+      return String.format("%s", globalKafkaCommitOffsets.keySet().stream()
+          .map(key -> key + "=" + globalKafkaCommitOffsets.get(key))
+          .collect(Collectors.joining(", ", "{", "}")));
+    }
   }
 
   /**
@@ -199,6 +209,11 @@ public class ControlEvent implements Serializable {
     public OutcomeType getOutcomeType() {
       return outcomeType;
     }
+
+    @Override
+    public String toString() {
+      return String.format("%s %s %s", Arrays.toString(writeStatusList), kafkaCommitOffset, outcomeType.name());
+    }
   }
 
   /**
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
index 0179f3b..c19d1b8 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java
@@ -21,6 +21,8 @@ package org.apache.hudi.connect.transaction;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
 
+import java.io.IOException;
+
 /**
  * Interface for the Participant that
  * manages Writes for a
@@ -35,7 +37,7 @@ public interface TransactionParticipant {
 
   void buffer(SinkRecord record);
 
-  void processRecords();
+  void processRecords() throws IOException;
 
   TopicPartition getPartition();
 
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index c958b2b..3d8e5f8 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -81,11 +81,11 @@ public abstract class AbstractConnectWriter implements ConnectWriter<WriteStatus
   }
 
   @Override
-  public List<WriteStatus> close() {
+  public List<WriteStatus> close() throws IOException {
     return flushHudiRecords();
   }
 
   protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record);
 
-  protected abstract List<WriteStatus> flushHudiRecords();
+  protected abstract List<WriteStatus> flushHudiRecords() throws IOException;
 }
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index 3319604..a60293d 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.KeyGenerator;
@@ -94,7 +93,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
   }
 
   @Override
-  public List<WriteStatus> flushHudiRecords() {
+  public List<WriteStatus> flushHudiRecords() throws IOException {
     try {
       LOG.info("Number of entries in MemoryBasedMap => "
           + bufferedRecords.getInMemoryMapNumEntries()
@@ -114,7 +113,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter {
           + writeStatuses);
       return writeStatuses;
     } catch (Exception e) {
-      throw new HoodieException("Write records failed", e);
+      throw new IOException("Write records failed", e);
     }
   }
 }
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
index 900ba46..4e5aaa1 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
@@ -68,44 +68,47 @@ public class TestConnectTransactionParticipant {
   @EnumSource(value = CoordinatorFailureTestScenarios.class)
   public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
     int expectedRecordsWritten = 0;
-    switch (testScenario) {
-      case REGULAR_SCENARIO:
-        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
-        assertTrue(testKafkaConnect.isPaused());
-        break;
-      case COORDINATOR_FAILED_AFTER_START_COMMIT:
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        // Coordinator Failed
-        initializeCoordinator();
-        break;
-      case COORDINATOR_FAILED_AFTER_END_COMMIT:
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
-        // Coordinator Failed
-        initializeCoordinator();
-        break;
-      default:
-        throw new HoodieException("Unknown test scenario " + testScenario);
-    }
-
-    // Regular Case or Coordinator Recovery Case
-    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-    expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
-    assertTrue(testKafkaConnect.isResumed());
-    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-    testKafkaConnect.putRecordsToParticipant();
-    assertTrue(testKafkaConnect.isPaused());
-    coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
-    testKafkaConnect.putRecordsToParticipant();
-    assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
-    // Ensure Coordinator and participant are in sync in the kafka offsets
-    assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+    try {
+      switch (testScenario) {
+        case REGULAR_SCENARIO:
+          expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+          assertTrue(testKafkaConnect.isPaused());
+          break;
+        case COORDINATOR_FAILED_AFTER_START_COMMIT:
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          // Coordinator Failed
+          initializeCoordinator();
+          break;
+        case COORDINATOR_FAILED_AFTER_END_COMMIT:
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+          expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+          // Coordinator Failed
+          initializeCoordinator();
+          break;
+        default:
+          throw new HoodieException("Unknown test scenario " + testScenario);
+      }
 
+      // Regular Case or Coordinator Recovery Case
+      coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+      expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+      assertTrue(testKafkaConnect.isResumed());
+      coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+      testKafkaConnect.putRecordsToParticipant();
+      assertTrue(testKafkaConnect.isPaused());
+      coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+      testKafkaConnect.putRecordsToParticipant();
+      assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+      // Ensure Coordinator and participant are in sync in the kafka offsets
+      assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+    } catch (Exception exception) {
+      throw new HoodieException("Unexpected test failure ", exception);
+    }
     participant.stop();
   }
 
@@ -113,59 +116,63 @@ public class TestConnectTransactionParticipant {
   @EnumSource(value = ParticipantFailureTestScenarios.class)
   public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
     int expectedRecordsWritten = 0;
-    switch (testScenario) {
-      case FAILURE_BEFORE_START_COMMIT:
-        testKafkaConnect.putRecordsToParticipant();
-        // Participant fails
-        initializeParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-        expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
-        assertTrue(testKafkaConnect.isResumed());
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertTrue(testKafkaConnect.isPaused());
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
-        // Ensure Coordinator and participant are in sync in the kafka offsets
-        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
-        break;
-      case FAILURE_AFTER_START_COMMIT:
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        // Participant fails
-        initializeParticipant();
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertTrue(testKafkaConnect.isPaused());
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
-        // Ensure Coordinator and participant are in sync in the kafka offsets
-        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
-        break;
-      case FAILURE_AFTER_END_COMMIT:
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        // Participant fails
-        initializeParticipant();
-        testKafkaConnect.putRecordsToParticipant();
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertTrue(testKafkaConnect.isPaused());
-        coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
-        testKafkaConnect.putRecordsToParticipant();
-        assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
-        // Ensure Coordinator and participant are in sync in the kafka offsets
-        assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
-        break;
-      default:
-        throw new HoodieException("Unknown test scenario " + testScenario);
+    try {
+      switch (testScenario) {
+        case FAILURE_BEFORE_START_COMMIT:
+          testKafkaConnect.putRecordsToParticipant();
+          // Participant fails
+          initializeParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+          expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant();
+          assertTrue(testKafkaConnect.isResumed());
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertTrue(testKafkaConnect.isPaused());
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+          // Ensure Coordinator and participant are in sync in the kafka offsets
+          assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+          break;
+        case FAILURE_AFTER_START_COMMIT:
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          // Participant fails
+          initializeParticipant();
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertTrue(testKafkaConnect.isPaused());
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+          // Ensure Coordinator and participant are in sync in the kafka offsets
+          assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+          break;
+        case FAILURE_AFTER_END_COMMIT:
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          // Participant fails
+          initializeParticipant();
+          testKafkaConnect.putRecordsToParticipant();
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertTrue(testKafkaConnect.isPaused());
+          coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT);
+          testKafkaConnect.putRecordsToParticipant();
+          assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten);
+          // Ensure Coordinator and participant are in sync in the kafka offsets
+          assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset());
+          break;
+        default:
+          throw new HoodieException("Unknown test scenario " + testScenario);
+      }
+    } catch (Exception exception) {
+      throw new HoodieException("Unexpected test failure ", exception);
     }
   }
 
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java
index 9530809..6e947de 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
@@ -60,7 +61,7 @@ public class TestKafkaConnect implements SinkTaskContext {
     return !isPaused;
   }
 
-  public int putRecordsToParticipant() {
+  public int putRecordsToParticipant() throws IOException {
     for (int i = 1; i <= NUM_RECORDS_BATCH; i++) {
       participant.buffer(getNextKafkaRecord());
     }
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
index 14bc4e4..cf81096 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -74,6 +74,8 @@
                                     <exclude>commons-httpclient:commons-httpclient</exclude>
                                     <exclude>org.apache.htrace:htrace-core</exclude>
                                     <exclude>org.jamon:jamon-runtime</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    <exclude>log4j:log4j</exclude>
                                     <exclude>jdk.tools:jdk.tools</exclude>
                                     <exclude>junit:junit</exclude>
                                 </excludes>