You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 07:40:57 UTC
[camel-kafka-connector] branch master updated: Make producing test
messages for sink tests more flexible
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 87d63c2 Make producing test messages for sink tests more flexible
87d63c2 is described below
commit 87d63c21e1714ca7d74f93e9047df7863940e85f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 18:26:19 2021 +0100
Make producing test messages for sink tests more flexible
---
.../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 6 --
.../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 6 --
.../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 60 ++++++++++---------
.../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java | 69 +++++++++++-----------
.../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java | 52 ++++++++--------
.../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 21 ++++---
.../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java | 25 +++++---
.../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java | 26 +++++---
.../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 5 --
.../blob/sink/CamelSinkAzureStorageBlobITCase.java | 47 ++++++++-------
.../sink/CamelSinkAzureStorageQueueITCase.java | 6 --
.../cassandra/sink/CamelSinkCassandraITCase.java | 6 --
.../common/test/AbstractTestMessageProducer.java | 59 ++++++++++++++++++
.../common/test/CamelSinkTestSupport.java | 64 ++++++++++----------
...cer.java => FunctionalTestMessageProducer.java} | 4 +-
...ageProducer.java => StringMessageProducer.java} | 29 ++++++++-
.../common/test/TestMessageProducer.java | 13 +++-
.../couchbase/sink/CamelSinkCouchbaseITCase.java | 43 ++++++++------
.../sink/CamelSinkElasticSearchITCase.java | 7 ---
.../file/sink/CamelSinkFileITCase.java | 27 +++++----
.../hdfs/sink/CamelSinkHDFSITCase.java | 25 ++++----
.../http/sink/CamelSinkHTTPITCase.java | 5 --
.../jdbc/sink/CamelSinkJDBCITCase.java | 41 +++++++------
.../mongodb/sink/CamelSinkMongoDBITCase.java | 25 ++++----
.../rabbitmq/sink/RabbitMQSinkITCase.java | 6 --
.../sjms2/sink/CamelSinkIdempotentJMSITCase.java | 4 --
.../sjms2/sink/CamelSinkJMSITCase.java | 6 --
.../sql/sink/CamelSinkSQLITCase.java | 40 ++++++++-----
.../ssh/sink/CamelSinkSshITCase.java | 23 ++++----
.../syslog/sink/CamelSinkSyslogITCase.java | 24 ++++----
30 files changed, 434 insertions(+), 340 deletions(-)
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 8d893c9..aea8c76 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -18,7 +18,6 @@
package org.apache.camel.kafkaconnector.aws.v1.sns.sink;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -81,11 +80,6 @@ public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(120, TimeUnit.SECONDS)) {
assertEquals(expect, received,
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index b38441f..894114f 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,7 +18,6 @@
package org.apache.camel.kafkaconnector.aws.v1.sqs.sink;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -91,11 +90,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(110, TimeUnit.SECONDS)) {
assertEquals(expect, received,
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index 9b27827..550ea47 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -62,6 +63,23 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
private volatile int received;
private final int expect = 10;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
+ "test-dimension-" + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
+
+ return headers;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws2-cw-kafka-connector"};
@@ -78,17 +96,6 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
-
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
- "test-dimension-" + current);
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
-
- return headers;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
ListMetricsRequest request = ListMetricsRequest.builder()
@@ -134,23 +141,18 @@ public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
@Test
@Timeout(value = 120)
- public void testBasicSendReceive() {
- try {
- Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
- .basic()
- .withTopics(topicName)
- .withConfiguration(TestCloudWatchConfiguration.class.getName())
- .withAmazonConfig(amazonProperties)
- .withName(metricName)
- .withSinkPathNamespace(namespace);
-
- runTest(testProperties, topicName, expect);
- } catch (Exception e) {
- LOG.error("Amazon CloudWatch test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withConfiguration(TestCloudWatchConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties)
+ .withName(metricName)
+ .withSinkPathNamespace(namespace);
+
+ runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index ee6e350..0c64bcf 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -56,6 +57,26 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
private volatile int received;
private final int expect = 10;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
+ "image-id-" + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default");
+
+ return headers;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws2-ec2-kafka-connector"};
@@ -69,19 +90,6 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
received = 0;
}
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
-
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2ImageId",
- "image-id-" + current);
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceType", "T1_MICRO");
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMinCount", "1");
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceMaxCount", "1");
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsEC2InstanceSecurityGroups", "default");
-
- return headers;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
@@ -119,31 +127,24 @@ public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
received, expect));
}
-
-
}
@Test
@Timeout(90)
- public void testBasicSendReceive() {
- try {
- Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
- .basic()
- .withTopics(topicName)
- .withConfiguration(TestCloudWatchConfiguration.class.getName())
- .withAmazonConfig(amazonProperties)
- .withSinkPathLabel(logicalName)
- .withConfiguration(TestEC2Configuration.class.getName())
- .withSinkEndpointOperation("createAndRunInstances");
-
- runTest(testProperties, topicName, expect);
- } catch (Exception e) {
- LOG.error("Amazon EC2 test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory testProperties = CamelAWSEC2PropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withConfiguration(TestCloudWatchConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties)
+ .withSinkPathLabel(logicalName)
+ .withConfiguration(TestEC2Configuration.class.getName())
+ .withSinkEndpointOperation("createAndRunInstances");
+
+ runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index 7c212bc..f88e078 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -56,14 +57,20 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
private volatile int received;
private final int expect = 10;
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername",
- "username-" + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsIAMUsername",
+ "username-" + current);
- return headers;
+ return headers;
+ }
}
@Override
@@ -117,24 +124,19 @@ public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
@Test
@Timeout(90)
- public void testBasicSendReceive() {
- try {
- Properties amazonProperties = awsService.getConnectionProperties();
- String topicName = TestUtils.getDefaultTestTopic(this.getClass());
-
- ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
- .basic()
- .withTopics(topicName)
- .withConfiguration(TestCloudWatchConfiguration.class.getName())
- .withAmazonConfig(amazonProperties)
- .withSinkPathLabel(logicalName)
- .withConfiguration(TestIAMConfiguration.class.getName())
- .withSinkEndpointOperation("createUser");
-
- runTest(testProperties, topicName, expect);
- } catch (Exception e) {
- LOG.error("Amazon IAM test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory testProperties = CamelAWSIAMPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withConfiguration(TestCloudWatchConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties)
+ .withSinkPathLabel(logicalName)
+ .withConfiguration(TestIAMConfiguration.class.getName())
+ .withSinkEndpointOperation("createUser");
+
+ runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index b975a3d..9ca84ef 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils;
import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -64,14 +65,20 @@ public class CamelSinkAWSKinesisITCase extends CamelSinkTestSupport {
private volatile int received;
private final int expect = 10;
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey",
- "partition-" + current);
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
- return headers;
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKinesisPartitionKey",
+ "partition-" + current);
+
+ return headers;
+ }
}
@Override
@@ -143,6 +150,6 @@ public class CamelSinkAWSKinesisITCase extends CamelSinkTestSupport {
.withConfiguration(TestKinesisConfiguration.class.getName())
.withStreamName(streamName);
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
index 4f57799..f58c54a 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -62,18 +63,24 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport {
private volatile int received;
private final int expect = 10;
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId",
- String.valueOf(current));
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription",
- "test key " + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSKeyId",
+ String.valueOf(current));
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsKMSDescription",
+ "test key " + current);
- return headers;
+
+ return headers;
+ }
}
@Override
@@ -146,6 +153,6 @@ public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport {
.withSinkEndpointOperation("createKey")
.withSinkPathLabel(logicalName);
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index ea77a09..1311507 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -29,6 +29,7 @@ import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -59,18 +60,25 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
private volatile int received;
private int expect = 10;
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> headers = new HashMap<>();
+ private class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key",
- "file" + current + ".txt");
- headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName",
- bucketName);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key",
+ "file" + current + ".txt");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName",
+ bucketName);
- return headers;
+ return headers;
+ }
}
+
@Override
protected void consumeMessages(CountDownLatch latch) {
try {
@@ -147,6 +155,6 @@ public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
.withBucketNameOrArn(bucketName)
.withAutoCreateBucket(true);
- runTest(testProperties, topicName, expect);
+ runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 85b305b..c686377 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,7 +18,6 @@
package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -88,10 +87,6 @@ public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
}
}
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
@Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 1bbf9f1..727d3fb 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -30,6 +30,7 @@ import com.azure.storage.blob.models.BlobItem;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -62,6 +63,30 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
private int expect = 10;
private int received;
+ private class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "test " + current + " data";
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> messageParameters = new HashMap<>();
+
+ String sentFile = "test " + current;
+
+ sentData.put(sentFile, testMessageContent(current));
+
+ messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+ return messageParameters;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[]{"camel-azure-storage-blob-kafka-connector"};
@@ -84,24 +109,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- return "test " + current + " data";
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> messageParameters = new HashMap<>();
-
- String sentFile = "test " + current;
-
- sentData.put(sentFile, testMessageContent(current));
-
- messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
-
- return messageParameters;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
consume();
@@ -163,7 +170,7 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
.withContainerName(blobContainerName)
.withOperation("uploadBlockBlob");
- runTest(factory, topicName, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
@Test
@@ -180,6 +187,6 @@ public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
.append("operation", "uploadBlockBlob")
.buildUrl();
- runTest(factory, topicName, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index d447703..ef12c18 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -82,11 +81,6 @@ public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
consume();
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 2949fff..b88946e 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.cassandra.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -85,11 +84,6 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
new file mode 100644
index 0000000..28d3d0d
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTestMessageProducer<T> implements TestMessageProducer<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageProducer.class);
+
+ private final KafkaClient<String, T> kafkaClient;
+ private final String topicName;
+ private final int count;
+
+ public AbstractTestMessageProducer(KafkaClient<String, T> kafkaClient, String topicName, int count) {
+ this.kafkaClient = kafkaClient;
+ this.topicName = topicName;
+ this.count = count;
+ }
+
+ public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) {
+ this.kafkaClient = new KafkaClient<>(bootstrapServer);
+ this.topicName = topicName;
+ this.count = count;
+ }
+
+ public void produceMessages() throws ExecutionException, InterruptedException {
+ LOG.trace("Producing messages ...");
+ for (int i = 0; i < count; i++) {
+ T message = testMessageContent(i);
+ Map<String, String> headers = messageHeaders(message, i);
+
+ if (headers == null) {
+ kafkaClient.produce(topicName, message);
+ } else {
+ kafkaClient.produce(topicName, message, headers);
+ }
+ }
+ }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index bd02eef..b414726 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -18,7 +18,6 @@
package org.apache.camel.kafkaconnector.common.test;
import java.time.Duration;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -26,41 +25,12 @@ import java.util.concurrent.Executors;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.jupiter.api.Assertions.fail;
-
public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTestSupport.class);
- protected abstract Map<String, String> messageHeaders(String text, int current);
-
- protected String testMessageContent(int current) {
- return "Sink test message " + current;
- }
-
- protected void produceMessages(String topicName, int count) {
- try {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
- for (int i = 0; i < count; i++) {
- String message = testMessageContent(i);
- Map<String, String> headers = messageHeaders(message, i);
-
- if (headers == null) {
- kafkaClient.produce(topicName, message);
- } else {
- kafkaClient.produce(topicName, message, headers);
- }
- }
- } catch (Throwable t) {
- LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
- fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
- }
- }
-
/**
* A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
*
@@ -70,7 +40,35 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
* @throws Exception For test-specific exceptions
*/
protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
- runTest(connectorPropertyFactory, () -> produceMessages(topic, count));
+ StringMessageProducer stringMessageProducer = new StringMessageProducer(getKafkaService().getBootstrapServers(),
+ topic, count);
+
+ runTest(connectorPropertyFactory, stringMessageProducer);
+ }
+
+ /**
+ * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @throws Exception For test-specific exceptions
+ */
+ protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws Exception {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ LOG.debug("Creating the consumer ...");
+ ExecutorService service = Executors.newCachedThreadPool();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ service.submit(() -> consumeMessages(latch));
+
+ producer.produceMessages();
+
+ LOG.debug("Waiting for the messages to be processed");
+ service.shutdown();
+
+ LOG.debug("Waiting for the test to complete");
+ verifyMessages(latch);
}
/**
@@ -80,7 +78,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
* @throws ExecutionException
* @throws InterruptedException
*/
- protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws ExecutionException, InterruptedException {
+ protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
@@ -90,7 +88,7 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
CountDownLatch latch = new CountDownLatch(1);
service.submit(() -> consumeMessages(latch));
- producer.producerMessages();
+ producer.produceMessages();
LOG.debug("Waiting for the messages to be processed");
service.shutdown();
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
similarity index 91%
copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
index dedcf97..794fefa 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/FunctionalTestMessageProducer.java
@@ -18,6 +18,6 @@
package org.apache.camel.kafkaconnector.common.test;
@FunctionalInterface
-public interface TestMessageProducer {
- void producerMessages();
+public interface FunctionalTestMessageProducer {
+ void produceMessages();
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
similarity index 51%
copy from tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
copy to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
index dedcf97..c25d2b5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageProducer.java
@@ -17,7 +17,30 @@
package org.apache.camel.kafkaconnector.common.test;
-@FunctionalInterface
-public interface TestMessageProducer {
- void producerMessages();
+import java.util.Map;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A producer that sends the 'count' amount of text messages to the Kafka broker
+ */
+public class StringMessageProducer extends AbstractTestMessageProducer<String> {
+
+ public StringMessageProducer(String bootStrapServer, String topicName, int count) {
+ super(bootStrapServer, topicName, count);
+ }
+
+ public StringMessageProducer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+ super(kafkaClient, topicName, count);
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "Sink test message " + current;
+ }
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
index dedcf97..ff69c5a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
@@ -17,7 +17,14 @@
package org.apache.camel.kafkaconnector.common.test;
-@FunctionalInterface
-public interface TestMessageProducer {
- void producerMessages();
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A producer of test messages
+ */
+public interface TestMessageProducer<T> {
+ Map<String, String> messageHeaders(T text, int current);
+ T testMessageContent(int current);
+ void produceMessages() throws ExecutionException, InterruptedException;
}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index 15104ac..46bf50d 100644
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -34,6 +34,7 @@ import com.couchbase.client.java.query.QueryResult;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
@@ -74,6 +75,28 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
private final int expect = 10;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
+
+ return jsonObject.toString();
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> parameters = new HashMap<>();
+
+ parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
+
+ return parameters;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-couchbase-kafka-connector"};
@@ -116,22 +139,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
-
- return jsonObject.toString();
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> parameters = new HashMap<>();
-
- parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
-
- return parameters;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
TestUtils.waitFor(this::waitForMinimumRecordCount);
@@ -210,7 +217,7 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
.withUsername(service.getUsername())
.withPassword(service.getPassword());
- runTest(factory, topic, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
@RepeatedTest(10)
@@ -229,6 +236,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
.buildUrl();
- runTest(factory, topic, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
}
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index c80f892..bc12b51 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.elasticsearch.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -74,11 +73,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
client.waitForIndex();
@@ -88,7 +82,6 @@ public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
} finally {
latch.countDown();
}
-
}
@Override
diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
index ead6c58..0e19e45 100644
--- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
+++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
@@ -27,12 +27,12 @@ import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -57,6 +57,17 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
private String topicName;
private final int expect = 1;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "test";
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-file-kafka-connector"};
@@ -81,16 +92,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- return "test";
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
File sinkFile = new File(SINK_DIR, FILENAME);
@@ -195,7 +196,7 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
.withFileName(FILENAME)
.withDoneFileName(FILENAME + ".done");
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
@Test
@@ -208,6 +209,6 @@ public class CamelSinkFileITCase extends CamelSinkTestSupport {
.append("doneFileName", FILENAME + ".done")
.buildUrl();
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index c7e7cc3..f12f310 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -19,12 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
import org.apache.camel.test.infra.hdfs.v2.services.HDFSService;
@@ -57,6 +57,17 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
private final int expect = 10;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "Sink test message: " + current;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-hdfs-kafka-connector"};
@@ -84,16 +95,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- return "Sink test message: " + current;
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
TestUtils.waitFor(this::filesCreated);
@@ -153,6 +154,6 @@ public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
.withPath(currentBasePath.getName())
.withSplitStrategy("MESSAGES:1,IDLE:1000");
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index ea5d2db..33bd066 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -20,7 +20,6 @@ package org.apache.camel.kafkaconnector.http.sink;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -85,10 +84,6 @@ public class CamelSinkHTTPITCase extends CamelSinkTestSupport {
}
}
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
index 3663890..f5ed6ed 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
@@ -73,6 +74,28 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
.build();
}
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> jdbcParameters = new HashMap<>();
+
+ // The prefix 'CamelHeader' is removed by the SinkTask
+ jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+ jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+
+ return jdbcParameters;
+ }
+ }
+
@BeforeEach
public void setUp() throws SQLException {
topicName = getTopicForTest(this);
@@ -86,22 +109,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> jdbcParameters = new HashMap<>();
-
- // The prefix 'CamelHeader' is removed by the SinkTask
- jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
- jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
-
- return jdbcParameters;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
LOG.debug("Waiting for indices");
@@ -159,6 +166,6 @@ public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
.withUseHeaderAsParameters(true)
.withTopics(topicName);
- runTest(factory, topicName, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
index da1b02a..29928af 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.mongodb.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -27,6 +26,7 @@ import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.mongodb.services.MongoDBService;
import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
@@ -56,6 +56,17 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
private final int expect = 10;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return String.format("{\"test\": \"value %d\"}", current);
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[]{"camel-mongodb-kafka-connector"};
@@ -69,16 +80,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
}
@Override
- protected String testMessageContent(int current) {
- return String.format("{\"test\": \"value %d\"}", current);
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
try {
MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
@@ -127,6 +128,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
.withCollection("testRecords")
.withOperation("insert");
- runTest(factory, topicName, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
index 01ad213..82b97c0 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.rabbitmq.sink;
import java.nio.charset.StandardCharsets;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -74,11 +73,6 @@ public class RabbitMQSinkITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void consumeMessages(CountDownLatch latch) {
DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
if (!this.checkRecord(delivery)) {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 432a20a..0b8bb52 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -90,10 +90,6 @@ public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
}
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 5e9b66d..50dabe6 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.sjms2.sink;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -85,11 +84,6 @@ public class CamelSinkJMSITCase extends CamelSinkTestSupport {
}
@Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
-
- @Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(35, TimeUnit.SECONDS)) {
assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index 79bf8f9..a6d8bdd 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.CamelSinkTask;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
@@ -54,6 +55,28 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
private final int expect = 1;
private int received;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "test";
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> sqlParameters = new HashMap<>();
+
+ // The prefix 'CamelHeader' is removed by the SinkTask
+ sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+ sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
+
+ return sqlParameters;
+ }
+ }
+
public CamelSinkSQLITCase() {
JdbcDatabaseContainer<?> container = new PostgreSQLContainer<>("postgres:9.6.2")
.withDatabaseName("camel")
@@ -80,21 +103,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
client = new DatabaseClient(sqlService.jdbcUrl());
}
- @Override
- protected String testMessageContent(int current) {
- return "test";
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- Map<String, String> sqlParameters = new HashMap<>();
-
- // The prefix 'CamelHeader' is removed by the SinkTask
- sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
- sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
-
- return sqlParameters;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
@@ -149,6 +157,6 @@ public class CamelSinkSQLITCase extends CamelSinkTestSupport {
.withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
.withTopics(topicName);
- runTest(factory, topicName, expect);
+ runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index 1c71719..d0535d4 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -17,12 +17,12 @@
package org.apache.camel.kafkaconnector.ssh.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.ssh.services.SshService;
import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
@@ -47,6 +47,17 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
private final int expect = 3;
private String topic;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return "date";
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-ssh-kafka-connector"};
@@ -57,15 +68,7 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
topic = TestUtils.getDefaultTestTopic(this.getClass());
}
- @Override
- protected String testMessageContent(int current) {
- return "date";
- }
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
@@ -90,6 +93,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
.withUsername("root")
.withPassword("root");
- runTest(connectorPropertyFactory, topic, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect));
}
}
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 1b9f942..78eb2f4 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,12 +16,12 @@
*/
package org.apache.camel.kafkaconnector.syslog.sink;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageProducer;
import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
import org.junit.jupiter.api.BeforeEach;
@@ -49,6 +49,17 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
private String topicName;
private final int expect = 1;
+ private static class CustomProducer extends StringMessageProducer {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ public String testMessageContent(int current) {
+ return TEST_TXT;
+ }
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-syslog-kafka-connector"};
@@ -59,15 +70,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
topicName = getTopicForTest(this);
}
- @Override
- protected String testMessageContent(int current) {
- return TEST_TXT;
- }
-
- @Override
- protected Map<String, String> messageHeaders(String text, int current) {
- return null;
- }
@Override
protected void consumeMessages(CountDownLatch latch) {
@@ -94,6 +96,6 @@ public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
.withPort(FREE_PORT)
.withProtocol("udp");
- runTest(connectorPropertyFactory, topicName, expect);
+ runTest(connectorPropertyFactory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
}
}