You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:35 UTC

[camel-kafka-connector] 02/18: Convert the AWS v1 tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit cc35b51880cbe5214e9cbdac8831e91d645e1720
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 10:54:01 2021 +0100

    Convert the AWS v1 tests to the new reusable sink test base class
---
 .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java     | 131 +++++++++------------
 .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java     | 130 ++++++++------------
 2 files changed, 102 insertions(+), 159 deletions(-)

diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 9d7cc80..8d893c9 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -18,19 +18,16 @@
 package org.apache.camel.kafkaconnector.aws.v1.sns.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
@@ -52,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createSNSService();
@@ -61,6 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
     private AWSSQSClient awsSqsClient;
     private String sqsQueueUrl;
     private String queueName;
+    private String topicName;
 
     private volatile int received;
     private final int expect = 10;
@@ -72,16 +70,31 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
+        topicName = getTopicForTest(this);
 
+        awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
         queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
         sqsQueueUrl = awsSqsClient.getQueue(queueName);
 
         LOG.info("Created SQS queue {}", sqsQueueUrl);
-
         received = 0;
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.getBody());
@@ -96,7 +109,8 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void consumeMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awsSqsClient.receiveFrom(sqsQueueUrl, this::checkMessages);
         } catch (Throwable t) {
@@ -107,92 +121,53 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory)
-            throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-        }
+    @Test
+    @Timeout(value = 90)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-        LOG.debug("Created the consumer ... About to receive messages");
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkConnectorDefault")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties);
 
-        if (latch.await(120, TimeUnit.SECONDS)) {
-            assertEquals(expect, received,
-                    "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(value = 90)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkConnectorDefault")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
-                    .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties);
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
+    public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-    @Test
-    @Timeout(value = 90)
-    public void testBasicSendReceiveUsingKafkaStyle() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkKafkaStyleConnector")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
-                    .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnector")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Disabled("AWS SNS component is failing to parse the sink URL for this one")
     @Test
     @Timeout(value = 90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkKafkaStyleConnector")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName)
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnector")
+                .withTopics(topicName)
+                .withUrl(queueName)
                     .append("queueUrl", sqsQueueUrl).append("subscribeSNStoSQS", "true")
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                     .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
-                    .append("configuration", "#class:" + TestSNSConfiguration.class.getName()).buildUrl();
+                    .append("configuration", "#class:" + TestSNSConfiguration.class.getName())
+                    .buildUrl();
 
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index 45028ae..b38441f 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,18 +18,16 @@
 package org.apache.camel.kafkaconnector.aws.v1.sqs.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
@@ -53,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createSQSService();
@@ -62,6 +60,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
     private AWSSQSClient awssqsClient;
     private String queueName;
     private String queueUrl;
+    private String topicName;
 
     private volatile int received;
     private final int expect = 10;
@@ -73,6 +72,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
 
         queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
@@ -90,6 +90,22 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received,
+                    expect));
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.getBody());
@@ -104,7 +120,8 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void consumeMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awssqsClient.receiveFrom(queueUrl, this::checkMessages);
         } catch (Throwable t) {
@@ -114,104 +131,55 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void produceMessages() {
-        try {
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-            }
-        } catch (Throwable t) {
-            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
-            fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        LOG.debug("Creating the consumer ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages();
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            assertEquals(expect, received,
-                    "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received,
-                    expect));
-        }
-    }
-
     @Test
     @Timeout(value = 120)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorSpringBootStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAmazonConfig(amazonProperties)
-                    .withQueueNameOrArn(queueName);
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorSpringBootStyle")
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties)
+                .withQueueNameOrArn(queueName);
+
+        runTest(testProperties, topicName, expect);
 
-            runTest(testProperties);
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
     }
 
     @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
     @Timeout(value = 120)
     @RepeatedTest(3)
-    public void testBasicSendReceiveUsingKafkaStyle() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorKafkaStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
-                    .withQueueNameOrArn(queueName);
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorKafkaStyle")
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
+                .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
-
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(testProperties, topicName, expect);
     }
 
     @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
     @Timeout(value = 120)
     @RepeatedTest(3)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorUsingUrl")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName)
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorUsingUrl")
+                .withTopics(topicName)
+                .withUrl(queueName)
                     .append("autoCreateQueue", "true")
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                     .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
                     .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
-                    .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)).buildUrl();
+                    .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
+                .buildUrl();
 
-            runTest(testProperties);
-
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(testProperties, topicName, expect);
     }
 
 }