You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/01/22 12:06:38 UTC

[camel-kafka-connector] branch master updated (fb19034 -> c7a6b0f)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from fb19034  Camel-Cron connector: We need to add at least camel-quartz as dependency
     new 11ca8b0  Created a base test class for AWS sink tests
     new 6a78354  Simplified handling of setting common AWS 2 properties
     new c7a6b0f  Added new integration test for AWS 2 Cloud Watch

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/itests-aws-v2/pom.xml                        |   5 +
 .../aws/v2/common/AWSPropertiesUtils.java          |  63 ++++++++
 .../aws/v2/common/CamelSinkAWSTestSupport.java     |  78 ++++++++++
 .../aws/v2/cw/sink/CamelAWSCWPropertyFactory.java  |  73 ++++++++++
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       | 161 +++++++++++++++++++++
 .../sink/TestCloudWatchConfiguration.java}         |  18 +--
 .../source/CamelAWSKinesisPropertyFactory.java     |  15 +-
 .../v2/s3/source/CamelAWSS3PropertyFactory.java    |  15 +-
 .../v2/sqs/sink/CamelAWSSQSPropertyFactory.java    |  26 +---
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     |  77 ++++------
 .../v2/sqs/source/CamelAWSSQSPropertyFactory.java  |  25 +---
 .../common/BasicConnectorPropertyFactory.java      |   2 +-
 12 files changed, 430 insertions(+), 128 deletions(-)
 create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/AWSPropertiesUtils.java
 create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
 create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
 create mode 100644 tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
 copy tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/{kinesis/source/TestKinesisConfiguration.java => cw/sink/TestCloudWatchConfiguration.java} (64%)


[camel-kafka-connector] 03/03: Added new integration test for AWS 2 Cloud Watch

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c7a6b0f82acb257905ec844565edfd028ce9cd61
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 22 11:23:06 2021 +0100

    Added new integration test for AWS 2 Cloud Watch
---
 tests/itests-aws-v2/pom.xml                        |   5 +
 .../aws/v2/common/CamelSinkAWSTestSupport.java     |  18 ++-
 .../aws/v2/cw/sink/CamelAWSCWPropertyFactory.java  |  73 ++++++++++
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       | 161 +++++++++++++++++++++
 .../v2/cw/sink/TestCloudWatchConfiguration.java    |  35 +++++
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     |  21 ++-
 6 files changed, 302 insertions(+), 11 deletions(-)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index cd573f3..caf3c28 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -67,6 +67,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-s3</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-cw</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
index c42cb36..a66a474 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.common;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -24,7 +25,6 @@ import java.util.concurrent.Executors;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,13 +33,21 @@ import static org.junit.jupiter.api.Assertions.fail;
 public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
 
+    protected abstract Map<String, String> messageHeaders(String text, int current);
 
-    protected void produceMessages(int count)  {
+    protected void produceMessages(String topicName, int count)  {
         try {
             KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
             for (int i = 0; i < count; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+                String message = "Sink test message " + i;
+                Map<String, String> headers = messageHeaders(message, i);
+
+                if (headers == null) {
+                    kafkaClient.produce(topicName, message);
+                } else {
+                    kafkaClient.produce(topicName, message, headers);
+                }
             }
         } catch (Throwable t) {
             LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
@@ -47,7 +55,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception {
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
 
@@ -58,7 +66,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
         service.submit(() -> consumeMessages(latch));
 
         LOG.debug("Creating the producer and sending messages ...");
-        produceMessages(count);
+        produceMessages(topic, count);
 
         LOG.debug("Waiting for the test to complete");
         verifyMessages(latch);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
new file mode 100644
index 0000000..cd7f638
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSCWPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSCWPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+    }
+
+    public CamelAWSCWPropertyFactory withSinkPathNamespace(String value) {
+        return setProperty("camel.sink.path.namespace", value);
+    }
+
+    public CamelAWSCWPropertyFactory withName(String value) {
+        return setProperty("camel.sink.endpoint.name", value);
+    }
+
+    public CamelAWSCWPropertyFactory withConfiguration(String value) {
+        return setProperty("camel.component.aws2-cw.configuration", classRef(value));
+    }
+
+    public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public static CamelAWSCWPropertyFactory basic() {
+        return new CamelAWSCWPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAWSCWConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.aws2cw.CamelAws2cwSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
new file mode 100644
index 0000000..62c7122
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse;
+import software.amazon.awssdk.services.cloudwatch.model.Metric;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
+
+    @RegisterExtension
+    public static AWSService awsService = AWSServiceFactory.createCloudWatchService();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSCWITCase.class);
+
+    private CloudWatchClient client;
+    private String namespace;
+    private String metricName = "test-metric";
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-cw-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = AWSSDKClientUtils.newCloudWatchClient();
+
+        namespace = "cw-" + TestUtils.randomWithRange(0, 1000);
+        LOG.debug("Using namespace {} for the test", namespace);
+
+        received = 0;
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> headers = new HashMap<>();
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
+                "test-dimension-" + current);
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
+
+        return headers;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            ListMetricsRequest request = ListMetricsRequest.builder()
+                    .namespace(namespace)
+                    .metricName(metricName)
+                    .build();
+
+            while (true) {
+                ListMetricsResponse response = client.listMetrics(request);
+
+                for (Metric metric : response.metrics()) {
+                    LOG.info("Retrieved metric {}", metric.metricName());
+
+                    for (Dimension dimension : metric.dimensions()) {
+                        LOG.info("Dimension {} value: {}", dimension.name(), dimension.value());
+                        received++;
+
+                        if (received == expect) {
+                            return;
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(Duration.ofSeconds(1).toMillis());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+
+    @Test
+    @Timeout(value = 120)
+    public void testBasicSendReceive() {
+        try {
+            Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+            ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+                    .basic()
+                    .withTopics(topicName)
+                    .withConfiguration(TestCloudWatchConfiguration.class.getName())
+                    .withAmazonConfig(amazonProperties)
+                    .withName(metricName)
+                    .withSinkPathNamespace(namespace);
+
+            runTest(testProperties, topicName, expect);
+        } catch (Exception e) {
+            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
new file mode 100644
index 0000000..e214289
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import org.apache.camel.component.aws2.cw.Cw2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+
+public class TestCloudWatchConfiguration extends Cw2Configuration {
+    private CloudWatchClient client;
+
+    @Override
+    public CloudWatchClient getAmazonCwClient() {
+        if (client == null) {
+            client = AWSSDKClientUtils.newCloudWatchClient();
+        }
+
+        return client;
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 6cc9b79..5b17a70 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,6 +18,7 @@
 package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -88,6 +89,11 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     }
 
     @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(110, TimeUnit.SECONDS)) {
             assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
@@ -127,15 +133,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceive() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorSpringBootStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withAmazonConfig(amazonProperties)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
@@ -148,15 +155,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceiveUsingKafkaStyle() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorKafkaStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
@@ -170,11 +178,12 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceiveUsingUrl() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorUsingUrl")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withUrl(queueName)
                         .append("autoCreateQueue", "true")
                         .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -184,7 +193,7 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
                         .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
                         .buildUrl();
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);


[camel-kafka-connector] 02/03: Simplified handling of setting common AWS 2 properties

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6a783544000a88fef4d68a9bcd4eb9d6736a41c7
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 22 11:00:33 2021 +0100

    Simplified handling of setting common AWS 2 properties
---
 .../aws/v2/common/AWSPropertiesUtils.java          | 63 ++++++++++++++++++++++
 .../source/CamelAWSKinesisPropertyFactory.java     | 15 ++----
 .../v2/s3/source/CamelAWSS3PropertyFactory.java    | 15 ++----
 .../v2/sqs/sink/CamelAWSSQSPropertyFactory.java    | 26 +--------
 .../v2/sqs/source/CamelAWSSQSPropertyFactory.java  | 25 +--------
 .../common/BasicConnectorPropertyFactory.java      |  2 +-
 6 files changed, 76 insertions(+), 70 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/AWSPropertiesUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/AWSPropertiesUtils.java
new file mode 100644
index 0000000..0f559dd
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/AWSPropertiesUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.common;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+import software.amazon.awssdk.regions.Region;
+
+public final class AWSPropertiesUtils {
+
+    private AWSPropertiesUtils() {
+
+    }
+
+    public static void setCommonProperties(Properties amazonConfigs, Map<String, String> style,
+                                           BasicConnectorPropertyFactory<?> propertyFactory) {
+        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
+        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
+        String regionKey = style.get(AWSConfigs.REGION);
+        String protocolKey = style.get(AWSConfigs.PROTOCOL);
+        String hostKey = style.get(AWSConfigs.AMAZON_AWS_HOST);
+
+        propertyFactory.setProperty(accessKeyKey,
+                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
+        propertyFactory.setProperty(secretKeyKey,
+                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
+        propertyFactory.setProperty(regionKey,
+                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
+
+        String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL, "");
+
+        if (protocolKey != null && !protocolKey.isEmpty()) {
+            if (protocol != null && !protocol.isEmpty()) {
+                propertyFactory.setProperty(protocolKey, protocol);
+            }
+        }
+
+        if (hostKey != null && !hostKey.isEmpty()) {
+            String amazonAwsHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "");
+            if (amazonAwsHost != null && !amazonAwsHost.isEmpty()) {
+                propertyFactory.setProperty(hostKey, amazonAwsHost);
+            }
+        }
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelAWSKinesisPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelAWSKinesisPropertyFactory.java
index 27e8fc3..78ef14d 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelAWSKinesisPropertyFactory.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelAWSKinesisPropertyFactory.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
 import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
 import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
-import software.amazon.awssdk.regions.Region;
 
 
 /**
@@ -53,16 +53,9 @@ final class CamelAWSKinesisPropertyFactory extends SourceConnectorPropertyFactor
     }
 
     public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
-        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
-        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
-        String regionKey = style.get(AWSConfigs.REGION);
-
-        setProperty(accessKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
-        setProperty(secretKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
-        return setProperty(regionKey,
-                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
     }
 
     public CamelAWSKinesisPropertyFactory withStreamName(String streamName) {
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java
index a59876b..9584ac6 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelAWSS3PropertyFactory.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
 import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
 import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
-import software.amazon.awssdk.regions.Region;
 
 
 /**
@@ -53,16 +53,9 @@ final class CamelAWSS3PropertyFactory extends SourceConnectorPropertyFactory<Cam
     }
 
     public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
-        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
-        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
-        String regionKey = style.get(AWSConfigs.REGION);
-
-        setProperty(accessKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
-        setProperty(secretKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
-        return setProperty(regionKey,
-                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
     }
 
     public EndpointUrlBuilder<CamelAWSS3PropertyFactory> withUrl(String bucket) {
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java
index 0ac2d5c..91ed4bf 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
 import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
 import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
-import software.amazon.awssdk.regions.Region;
 
 /**
  * Creates the set of properties used by a Camel JMS Sink Connector
@@ -57,29 +57,7 @@ final class CamelAWSSQSPropertyFactory extends SinkConnectorPropertyFactory<Came
     }
 
     public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
-        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
-        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
-        String regionKey = style.get(AWSConfigs.REGION);
-        String protocolKey = style.get(AWSConfigs.PROTOCOL);
-        String hostKey = style.get(AWSConfigs.AMAZON_AWS_HOST);
-
-        setProperty(accessKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
-        setProperty(secretKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
-        setProperty(regionKey,
-                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
-
-        String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL, "");
-
-        if (protocol != null && !protocol.isEmpty()) {
-            setProperty(protocolKey, protocol);
-        }
-
-        String amazonAwsHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "");
-        if (amazonAwsHost != null && !amazonAwsHost.isEmpty()) {
-            setProperty(hostKey, amazonAwsHost);
-        }
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
 
         return this;
     }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java
index 6e8d946..85aa76e 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
 import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
 import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
-import software.amazon.awssdk.regions.Region;
 
 /**
  * Creates the set of properties used by a Camel JMS Sink Connector
@@ -56,28 +56,7 @@ final class CamelAWSSQSPropertyFactory extends SourceConnectorPropertyFactory<Ca
     }
 
     public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
-        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
-        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
-        String regionKey = style.get(AWSConfigs.REGION);
-        String protocolKey = style.get(AWSConfigs.PROTOCOL);
-        String hostKey = style.get(AWSConfigs.AMAZON_AWS_HOST);
-
-        setProperty(accessKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
-        setProperty(secretKeyKey,
-                amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
-        setProperty(regionKey,
-                amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()));
-
-        String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL, "");
-        if (protocol != null && !protocol.isEmpty()) {
-            setProperty(protocolKey, protocol);
-        }
-
-        String amazonAwsHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "");
-        if (amazonAwsHost != null && !amazonAwsHost.isEmpty()) {
-            setProperty(hostKey, amazonAwsHost);
-        }
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
 
         return this;
     }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
index 1dc3bdd..a9d012e 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java
@@ -86,7 +86,7 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp
         return new ComponentConfigBuilder<>((T) this, getProperties(), name, value);
     }
 
-    protected T setProperty(String name, Object value) {
+    public T setProperty(String name, Object value) {
         connectorProps.put(name, value);
 
         return (T) this;


[camel-kafka-connector] 01/03: Created a base test class for AWS sink tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 11ca8b0f84da9047cdca3dfd0131fceff3ea75c9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 22 10:08:47 2021 +0100

    Created a base test class for AWS sink tests
---
 .../aws/v2/common/CamelSinkAWSTestSupport.java     | 70 ++++++++++++++++++++++
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     | 62 +++++--------------
 2 files changed, 86 insertions(+), 46 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
new file mode 100644
index 0000000..c42cb36
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.common;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
+
+
+    protected void produceMessages(int count)  {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < count; i++) {
+                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+            }
+        } catch (Throwable t) {
+            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
+            fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Creating the consumer ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> consumeMessages(latch));
+
+        LOG.debug("Creating the producer and sending messages ...");
+        produceMessages(count);
+
+        LOG.debug("Waiting for the test to complete");
+        verifyMessages(latch);
+    }
+
+    protected abstract void consumeMessages(CountDownLatch latch);
+
+    protected abstract void verifyMessages(CountDownLatch latch) throws InterruptedException;
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 0f770bc..6cc9b79 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -20,14 +20,11 @@ package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
@@ -53,13 +50,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createSQSService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
 
-
     private AWSSQSClient awssqsClient;
     private String queueName;
 
@@ -91,6 +87,16 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.body());
@@ -106,7 +112,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
     }
 
 
-    private void consumeMessages(CountDownLatch latch) {
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awssqsClient.receive(queueName, this::checkMessages);
         } catch (Throwable t) {
@@ -116,42 +122,6 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void produceMessages()  {
-        try {
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-            }
-        } catch (Throwable t) {
-            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
-            fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        LOG.debug("Creating the consumer ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages();
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
-                    received, expect));
-        }
-    }
-
-
     @Test
     @Timeout(value = 120)
     public void testBasicSendReceive() {
@@ -165,7 +135,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
                     .withAmazonConfig(amazonProperties)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
@@ -186,7 +156,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
                     .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
@@ -214,7 +184,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
                         .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
                         .buildUrl();
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);