You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 07:40:57 UTC

[camel-kafka-connector] branch master updated: Make producing test messages for sink tests more flexible

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d63c2  Make producing test messages for sink tests more flexible
87d63c2 is described below

commit 87d63c21e1714ca7d74f93e9047df7863940e85f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 18:26:19 2021 +0100

    Make producing test messages for sink tests more flexible
---
 .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java     |  6 --
 .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java     |  6 --
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       | 60 ++++++++++---------
 .../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java     | 69 +++++++++++-----------
 .../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java     | 52 ++++++++--------
 .../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 21 ++++---
 .../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java     | 25 +++++---
 .../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java       | 26 +++++---
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     |  5 --
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 47 ++++++++-------
 .../sink/CamelSinkAzureStorageQueueITCase.java     |  6 --
 .../cassandra/sink/CamelSinkCassandraITCase.java   |  6 --
 .../common/test/AbstractTestMessageProducer.java   | 59 ++++++++++++++++++
 .../common/test/CamelSinkTestSupport.java          | 64 ++++++++++----------
 ...cer.java => FunctionalTestMessageProducer.java} |  4 +-
 ...ageProducer.java => StringMessageProducer.java} | 29 ++++++++-
 .../common/test/TestMessageProducer.java           | 13 +++-
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 43 ++++++++------
 .../sink/CamelSinkElasticSearchITCase.java         |  7 ---
 .../file/sink/CamelSinkFileITCase.java             | 27 +++++----
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 25 ++++----
 .../http/sink/CamelSinkHTTPITCase.java             |  5 --
 .../jdbc/sink/CamelSinkJDBCITCase.java             | 41 +++++++------
 .../mongodb/sink/CamelSinkMongoDBITCase.java       | 25 ++++----
 .../rabbitmq/sink/RabbitMQSinkITCase.java          |  6 --
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   |  4 --
 .../sjms2/sink/CamelSinkJMSITCase.java             |  6 --
 .../sql/sink/CamelSinkSQLITCase.java               | 40 ++++++++-----
 .../ssh/sink/CamelSinkSshITCase.java               | 23 ++++----
 .../syslog/sink/CamelSinkSyslogITCase.java         | 24 ++++----
 30 files changed, 434 insertions(+), 340 deletions(-)

diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 8d893c9..aea8c76 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -18,7 +18,6 @@
 package org.apache.camel.kafkaconnector.aws.v1.sns.sink;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -81,11 +80,6 @@ public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(120, TimeUnit.SECONDS)) {
             assertEquals(expect, received,
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index b38441f..894114f 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,7 +18,6 @@
 package org.apache.camel.kafkaconnector.aws.v1.sqs.sink;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -91,11 +90,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(110, TimeUnit.SECONDS)) {
             assertEquals(expect, received,
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index 9b27827..550ea47 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -62,6 +63,23 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
     private volatile int received;
     private final int expect = 10;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
+
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
+                    "test-dimension-" + current);
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
+
+            return headers;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-aws2-cw-kafka-connector"};
@@ -78,17 +96,6 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
-
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
-                "test-dimension-" + current);
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
-
-        return headers;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             ListMetricsRequest request = ListMetricsRequest.builder()
@@ -134,23 +141,18 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
 
     @Test
     @Timeout(value = 120)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
-            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
-                    .basic()
-                    .withTopics(topicName)
-                    .withConfiguration(TestCloudWatchConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties)
-                    .withName(metricName)
-                    .withSinkPathNamespace(namespace);
-
-            runTest(testProperties, topicName, expect);
-        } catch (Exception e) {
-            LOG.error("Amazon CloudWatch test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConfiguration(TestCloudWatchConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties)
+                .withName(metricName)
+                .withSinkPathNamespace(namespace);
+
+        runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index ee6e350..0c64bcf 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -56,6 +57,26 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
     private volatile int received;
     private final int expect = 10;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
+
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
+                    "image-id-" + current);
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default");
+
+            return headers;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-aws2-ec2-kafka-connector"};
@@ -69,19 +90,6 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
         received = 0;
     }
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
-
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
-                "image-id-" + current);
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO");
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1");
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1");
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default");
-
-        return headers;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
@@ -119,31 +127,24 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
             fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
                     received, expect));
         }
-
-
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
-            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
-                    .basic()
-                    .withTopics(topicName)
-                    .withConfiguration(TestCloudWatchConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties)
-                    .withSinkPathLabel(logicalName)
-                    .withConfiguration(TestEC2Configuration.class.getName())
-                    .withSinkEndpointOperation("createAndRunInstances");
-
-            runTest(testProperties, topicName, expect);
-        } catch (Exception e) {
-            LOG.error("Amazon EC2 test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConfiguration(TestCloudWatchConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties)
+                .withSinkPathLabel(logicalName)
+                .withConfiguration(TestEC2Configuration.class.getName())
+                .withSinkEndpointOperation("createAndRunInstances");
+
+        runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 
 
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index 7c212bc..f88e078 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -56,14 +57,20 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
     private volatile int received;
     private final int expect = 10;
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername",
-                "username-" + current);
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername",
+                    "username-" + current);
 
-        return headers;
+            return headers;
+        }
     }
 
     @Override
@@ -117,24 +124,19 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
-            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
-                    .basic()
-                    .withTopics(topicName)
-                    .withConfiguration(TestCloudWatchConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties)
-                    .withSinkPathLabel(logicalName)
-                    .withConfiguration(TestIAMConfiguration.class.getName())
-                    .withSinkEndpointOperation("createUser");
-
-            runTest(testProperties, topicName, expect);
-        } catch (Exception e) {
-            LOG.error("Amazon IAM test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConfiguration(TestCloudWatchConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties)
+                .withSinkPathLabel(logicalName)
+                .withConfiguration(TestIAMConfiguration.class.getName())
+                .withSinkEndpointOperation("createUser");
+
+        runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index b975a3d..9ca84ef 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils;
 import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -64,14 +65,20 @@ public class CamelSinkAWSKinesisITCase  extends CamelSinkTestSupport {
     private volatile int received;
     private final int expect = 10;
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey",
-                "partition-" + current);
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
 
-        return headers;
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey",
+                    "partition-" + current);
+
+            return headers;
+        }
     }
 
     @Override
@@ -143,6 +150,6 @@ public class CamelSinkAWSKinesisITCase  extends CamelSinkTestSupport {
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withStreamName(streamName);
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
index 4f57799..f58c54a 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -62,18 +63,24 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport {
     private volatile int received;
     private final int expect = 10;
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId",
-                String.valueOf(current));
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription",
-                "test key " + current);
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId",
+                    String.valueOf(current));
 
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription",
+                    "test key " + current);
 
-        return headers;
+
+            return headers;
+        }
     }
 
     @Override
@@ -146,6 +153,6 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport {
                 .withSinkEndpointOperation("createKey")
                 .withSinkPathLabel(logicalName);
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index ea77a09..1311507 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
 import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -59,18 +60,25 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
     private volatile int received;
     private int expect = 10;
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> headers = new HashMap<>();
+    private class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> headers = new HashMap<>();
 
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key",
-                "file" + current + ".txt");
-        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName",
-                bucketName);
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key",
+                    "file" + current + ".txt");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName",
+                    bucketName);
 
-        return headers;
+            return headers;
+        }
     }
 
+
     @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
@@ -147,6 +155,6 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
                 .withBucketNameOrArn(bucketName)
                 .withAutoCreateBucket(true);
 
-        runTest(testProperties, topicName, expect);
+        runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 85b305b..c686377 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,7 +18,6 @@
 package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -88,10 +87,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
         }
     }
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
 
     @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 1bbf9f1..727d3fb 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -30,6 +30,7 @@ import com.azure.storage.blob.models.BlobItem;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -62,6 +63,30 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     private int expect = 10;
     private int received;
 
+    private class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "test " + current + " data";
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> messageParameters = new HashMap<>();
+
+            String sentFile = "test " + current;
+
+            sentData.put(sentFile, testMessageContent(current));
+
+            messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+            return messageParameters;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[]{"camel-azure-storage-blob-kafka-connector"};
@@ -84,24 +109,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        return "test " + current + " data";
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> messageParameters = new HashMap<>();
-
-        String sentFile = "test " + current;
-
-        sentData.put(sentFile, testMessageContent(current));
-
-        messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
-
-        return messageParameters;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             consume();
@@ -163,7 +170,7 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
                 .withContainerName(blobContainerName)
                 .withOperation("uploadBlockBlob");
 
-        runTest(factory, topicName, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 
     @Test
@@ -180,6 +187,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
                     .append("operation", "uploadBlockBlob")
                     .buildUrl();
 
-        runTest(factory, topicName, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index d447703..ef12c18 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -82,11 +81,6 @@ public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             consume();
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 2949fff..b88946e 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.cassandra.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -85,11 +84,6 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
new file mode 100644
index 0000000..28d3d0d
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTestMessageProducer<T> implements TestMessageProducer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageProducer.class);
+
+    private final KafkaClient<String, T> kafkaClient;
+    private final String topicName;
+    private final int count;
+
+    public AbstractTestMessageProducer(KafkaClient<String, T> kafkaClient, String topicName, int count) {
+        this.kafkaClient = kafkaClient;
+        this.topicName = topicName;
+        this.count = count;
+    }
+
+    public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) {
+        this.kafkaClient = new KafkaClient<>(bootstrapServer);
+        this.topicName = topicName;
+        this.count = count;
+    }
+
+    public void produceMessages() throws ExecutionException, InterruptedException {
+        LOG.trace("Producing messages ...");
+        for (int i = 0; i < count; i++) {
+            T message = testMessageContent(i);
+            Map<String, String> headers = messageHeaders(message, i);
+
+            if (headers == null) {
+                kafkaClient.produce(topicName, message);
+            } else {
+                kafkaClient.produce(topicName, message, headers);
+            }
+        }
+    }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index bd02eef..b414726 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -18,7 +18,6 @@
 package org.apache.camel.kafkaconnector.common.test;
 
 import java.time.Duration;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -26,41 +25,12 @@ import java.util.concurrent.Executors;
 
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.jupiter.api.Assertions.fail;
-
 public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTestSupport.class);
 
-    protected abstract Map<String, String> messageHeaders(String text, int current);
-
-    protected String testMessageContent(int current) {
-        return  "Sink test message " + current;
-    }
-
-    protected void produceMessages(String topicName, int count)  {
-        try {
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < count; i++) {
-                String message = testMessageContent(i);
-                Map<String, String> headers = messageHeaders(message, i);
-
-                if (headers == null) {
-                    kafkaClient.produce(topicName, message);
-                } else {
-                    kafkaClient.produce(topicName, message, headers);
-                }
-            }
-        } catch (Throwable t) {
-            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
-            fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
-        }
-    }
-
     /**
      * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
      *
@@ -70,7 +40,35 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
      * @throws Exception For test-specific exceptions
      */
     protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
-        runTest(connectorPropertyFactory, () -> produceMessages(topic, count));
+        StringMessageProducer stringMessageProducer = new StringMessageProducer(getKafkaService().getBootstrapServers(),
+                topic, count);
+
+        runTest(connectorPropertyFactory, stringMessageProducer);
+    }
+
+    /**
+     * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @throws Exception For test-specific exceptions
+     */
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Creating the consumer ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> consumeMessages(latch));
+
+        producer.produceMessages();
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
+
+        LOG.debug("Waiting for the test to complete");
+        verifyMessages(latch);
     }
 
     /**
@@ -80,7 +78,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
      * @throws ExecutionException
      * @throws InterruptedException
      */
-    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws ExecutionException, InterruptedException {
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
 
@@ -90,7 +88,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
         CountDownLatch latch = new CountDownLatch(1);
         service.submit(() -> consumeMessages(latch));
 
-        producer.producerMessages();
+        producer.produceMessages();
 
         LOG.debug("Waiting for the messages to be processed");
         service.shutdown();
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
similarity index 91%
copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
index dedcf97..794fefa 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
@@ -18,6 +18,6 @@
 package org.apache.camel.kafkaconnector.common.test;
 
 @FunctionalInterface
-public interface TestMessageProducer {
-    void producerMessages();
+public interface FunctionalTestMessageProducer {
+    void produceMessages();
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
similarity index 51%
copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
index dedcf97..c25d2b5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
@@ -17,7 +17,30 @@
 
 package org.apache.camel.kafkaconnector.common.test;
 
-@FunctionalInterface
-public interface TestMessageProducer {
-    void producerMessages();
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A producer that sends the 'count' amount of text messages to the Kafka broker
+ */
+public class StringMessageProducer extends AbstractTestMessageProducer<String> {
+
+    public StringMessageProducer(String bootStrapServer, String topicName, int count) {
+        super(bootStrapServer, topicName, count);
+    }
+
+    public StringMessageProducer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+        super(kafkaClient, topicName, count);
+    }
+
+    @Override
+    public Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    public String testMessageContent(int current) {
+        return "Sink test message " + current;
+    }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
index dedcf97..ff69c5a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
@@ -17,7 +17,14 @@
 
 package org.apache.camel.kafkaconnector.common.test;
 
-@FunctionalInterface
-public interface TestMessageProducer {
-    void producerMessages();
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A producer of test messages
+ */
+public interface TestMessageProducer<T> {
+    Map<String, String> messageHeaders(T text, int current);
+    T testMessageContent(int current);
+    void produceMessages() throws ExecutionException, InterruptedException;
 }
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index 15104ac..46bf50d 100644
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -34,6 +34,7 @@ import com.couchbase.client.java.query.QueryResult;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
@@ -74,6 +75,28 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
 
     private final int expect = 10;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
+
+            return jsonObject.toString();
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> parameters = new HashMap<>();
+
+            parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
+
+            return parameters;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-couchbase-kafka-connector"};
@@ -116,22 +139,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
-
-        return jsonObject.toString();
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> parameters = new HashMap<>();
-
-        parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
-
-        return parameters;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             TestUtils.waitFor(this::waitForMinimumRecordCount);
@@ -210,7 +217,7 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
                 .withUsername(service.getUsername())
                 .withPassword(service.getPassword());
 
-        runTest(factory, topic, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
     }
 
     @RepeatedTest(10)
@@ -229,6 +236,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
                     .buildUrl();
 
 
-        runTest(factory, topic, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
     }
 }
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index c80f892..bc12b51 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.elasticsearch.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -74,11 +73,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             client.waitForIndex();
@@ -88,7 +82,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
         } finally {
             latch.countDown();
         }
-
     }
 
     @Override
diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
index ead6c58..0e19e45 100644
--- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
+++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
@@ -27,12 +27,12 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -57,6 +57,17 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
     private String topicName;
     private final int expect = 1;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "test";
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-file-kafka-connector"};
@@ -81,16 +92,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        return "test";
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             File sinkFile = new File(SINK_DIR, FILENAME);
@@ -195,7 +196,7 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
                 .withFileName(FILENAME)
                 .withDoneFileName(FILENAME + ".done");
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 
     @Test
@@ -208,6 +209,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
                 .append("doneFileName", FILENAME + ".done")
                 .buildUrl();
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index c7e7cc3..f12f310 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -19,12 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
 import org.apache.camel.test.infra.hdfs.v2.services.HDFSService;
@@ -57,6 +57,17 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
 
     private final int expect = 10;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "Sink test message: " + current;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-hdfs-kafka-connector"};
@@ -84,16 +95,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        return "Sink test message: " + current;
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             TestUtils.waitFor(this::filesCreated);
@@ -153,6 +154,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
                 .withPath(currentBasePath.getName())
                 .withSplitStrategy("MESSAGES:1,IDLE:1000");
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index ea5d2db..33bd066 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -20,7 +20,6 @@ package org.apache.camel.kafkaconnector.http.sink;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -85,10 +84,6 @@ public class CamelSinkHTTPITCase extends CamelSinkTestSupport {
         }
     }
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
index 3663890..f5ed6ed 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
@@ -73,6 +74,28 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
                 .build();
     }
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> jdbcParameters = new HashMap<>();
+
+            // The prefix 'CamelHeader' is removed by the SinkTask
+            jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+            jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+
+            return jdbcParameters;
+        }
+    }
+
     @BeforeEach
     public void setUp() throws SQLException {
         topicName = getTopicForTest(this);
@@ -86,22 +109,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> jdbcParameters = new HashMap<>();
-
-        // The prefix 'CamelHeader' is removed by the SinkTask
-        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
-
-        return jdbcParameters;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             LOG.debug("Waiting for indices");
@@ -159,6 +166,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
                 .withUseHeaderAsParameters(true)
                 .withTopics(topicName);
 
-        runTest(factory, topicName, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
index da1b02a..29928af 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.mongodb.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -27,6 +26,7 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.mongodb.services.MongoDBService;
 import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
@@ -56,6 +56,17 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
 
     private final int expect = 10;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return String.format("{\"test\": \"value %d\"}", current);
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[]{"camel-mongodb-kafka-connector"};
@@ -69,16 +80,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected String testMessageContent(int current) {
-        return String.format("{\"test\": \"value %d\"}", current);
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         try {
             MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
@@ -127,6 +128,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
                 .withCollection("testRecords")
                 .withOperation("insert");
 
-        runTest(factory, topicName, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
index 01ad213..82b97c0 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -17,7 +17,6 @@
 package org.apache.camel.kafkaconnector.rabbitmq.sink;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -74,11 +73,6 @@ public class RabbitMQSinkITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void consumeMessages(CountDownLatch latch) {
         DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
             if (!this.checkRecord(delivery)) {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 432a20a..0b8bb52 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -90,10 +90,6 @@ public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 5e9b66d..50dabe6 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -85,11 +84,6 @@ public class CamelSinkJMSITCase extends CamelSinkTestSupport {
     }
 
     @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
-
-    @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(35, TimeUnit.SECONDS)) {
             assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index 79bf8f9..a6d8bdd 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
@@ -54,6 +55,28 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
     private final int expect = 1;
     private int received;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "test";
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(String text, int current) {
+            Map<String, String> sqlParameters = new HashMap<>();
+
+            // The prefix 'CamelHeader' is removed by the SinkTask
+            sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+            sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+
+            return sqlParameters;
+        }
+    }
+
     public CamelSinkSQLITCase() {
         JdbcDatabaseContainer<?> container = new PostgreSQLContainer<>("postgres:9.6.2")
                 .withDatabaseName("camel")
@@ -80,21 +103,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
         client = new DatabaseClient(sqlService.jdbcUrl());
     }
 
-    @Override
-    protected String testMessageContent(int current) {
-        return "test";
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        Map<String, String> sqlParameters = new HashMap<>();
-
-        // The prefix 'CamelHeader' is removed by the SinkTask
-        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
-
-        return sqlParameters;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
@@ -149,6 +157,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
                 .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
                 .withTopics(topicName);
 
-        runTest(factory, topicName, expect);
+        runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 1c71719..d0535d4 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -17,12 +17,12 @@
 
 package org.apache.camel.kafkaconnector.ssh.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
@@ -47,6 +47,17 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
     private final int expect = 3;
     private String topic;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return "date";
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-ssh-kafka-connector"};
@@ -57,15 +68,7 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
         topic = TestUtils.getDefaultTestTopic(this.getClass());
     }
 
-    @Override
-    protected String testMessageContent(int current) {
-        return "date";
-    }
 
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
@@ -90,6 +93,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
                 .withUsername("root")
                 .withPassword("root");
 
-        runTest(connectorPropertyFactory, topic, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
     }
 }
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 1b9f942..78eb2f4 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,12 +16,12 @@
  */
 package org.apache.camel.kafkaconnector.syslog.sink;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
@@ -49,6 +49,17 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     private String topicName;
     private final int expect = 1;
 
+    private static class CustomProducer extends StringMessageProducer {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        public String testMessageContent(int current) {
+            return TEST_TXT;
+        }
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-syslog-kafka-connector"};
@@ -59,15 +70,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
         topicName = getTopicForTest(this);
     }
 
-    @Override
-    protected String testMessageContent(int current) {
-        return TEST_TXT;
-    }
-
-    @Override
-    protected Map<String, String> messageHeaders(String text, int current) {
-        return null;
-    }
 
     @Override
     protected void consumeMessages(CountDownLatch latch) {
@@ -94,6 +96,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
                 .withPort(FREE_PORT)
                 .withProtocol("udp");
 
-        runTest(connectorPropertyFactory, topicName, expect);
+        runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
     }
 }