You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 14:15:47 UTC

[camel-kafka-connector] branch master updated: Added AWS v2 SNS sink integration test

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cf956d  Added AWS v2 SNS sink integration test
3cf956d is described below

commit 3cf956db7a22e262ff0074a0714ff3a9593b4a2e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 9 15:59:52 2020 +0100

    Added AWS v2 SNS sink integration test
---
 tests/itests-aws-v2/pom.xml                        |   5 +
 .../v2/sns/sink/CamelAWSSNSPropertyFactory.java    |  89 +++++++++++
 .../aws/v2/sns/sink/CamelSinkAWSSNSITCase.java     | 172 +++++++++++++++++++++
 .../aws/v2/sns/sink/TestSnsConfiguration.java      |  40 +++++
 4 files changed, 306 insertions(+)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 62048dc..84b13e8 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -87,6 +87,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-kms</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-sns</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java
new file mode 100644
index 0000000..0fafbfb
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * Creates the set of properties used by a Camel JMS Sink Connector
+ */
+final class CamelAWSSNSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSNSPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");
+    }
+
+    private CamelAWSSNSPropertyFactory() {
+    }
+
+    public EndpointUrlBuilder<CamelAWSSNSPropertyFactory> withUrl(String topicOrArn) {
+        String sinkUrl = String.format("aws2-sns:%s", topicOrArn);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+    }
+
+    public CamelAWSSNSPropertyFactory withTopicOrArn(String topicOrArn) {
+        return setProperty("camel.sink.path.topicNameOrArn", topicOrArn);
+    }
+
+    public CamelAWSSNSPropertyFactory withSubscribeSNStoSQS(String queue) {
+        return setProperty("camel.sink.endpoint.subscribeSNStoSQS", "true").setProperty("camel.sink.endpoint.queueUrl",
+                queue);
+    }
+
+    public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
+        String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
+        String regionKey = style.get(AWSConfigs.REGION);
+
+        setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
+        setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
+        return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()));
+    }
+
+    public CamelAWSSNSPropertyFactory withConfiguration(String configurationClass) {
+        return setProperty("camel.component.aws2-sns.configuration", classRef(configurationClass));
+    }
+
+    public static CamelAWSSNSPropertyFactory basic() {
+        return new CamelAWSSNSPropertyFactory().withName("CamelAWS2SNSSinkConnector")
+                .withTasksMax(1)
+                .withConnectorClass("org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector")
+                .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java
new file mode 100644
index 0000000..f78fc68
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory.classRef;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createSNSService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);
+
+    private AWSSQSClient awsSqsClient;
+    private String sqsQueueUrl;
+    private String queueName;
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-sns-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        awsSqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
+
+        queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
+        sqsQueueUrl = awsSqsClient.createQueue(queueName);
+
+        LOG.info("Created SQS queue {}", sqsQueueUrl);
+
+        received = 0;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
+    private boolean checkMessages(List<Message> messages) {
+        for (Message message : messages) {
+            LOG.info("Received: {}", message.body());
+
+            received++;
+        }
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            awsSqsClient.receive(sqsQueueUrl, this::checkMessages);
+        } catch (Throwable t) {
+            LOG.error("Failed to consume messages: {}", t.getMessage(), t);
+            fail(t.getMessage());
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Test
+    @Timeout(value = 90)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
+        String topicName = getTopicForTest(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+                .basic()
+                .withName("CamelAWSSNSSinkConnectorDefault")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl)
+                .withConfiguration(TestSnsConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(value = 90)
+    public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
+        String topicName = getTopicForTest(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+                .basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnector")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl)
+                .withConfiguration(TestSnsConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(value = 90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
+        String topicName = getTopicForTest(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+                .basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnectorWithUrl")
+                .withTopics(topicName)
+                .withUrl(queueName)
+                    .append("queueUrl", sqsQueueUrl)
+                    .append("subscribeSNStoSQS", "true")
+                    .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
+                    .append("configuration", classRef(TestSnsConfiguration.class.getName()))
+                    .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java
new file mode 100644
index 0000000..8dbc040
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import org.apache.camel.component.aws2.sns.Sns2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.sns.SnsClient;
+
+public class TestSnsConfiguration extends Sns2Configuration {
+    private SnsClient snsClient;
+
+    public TestSnsConfiguration() {
+        snsClient = AWSSDKClientUtils.newSNSClient();
+    }
+
+    @Override
+    public void setAmazonSNSClient(SnsClient amazonSNSClient) {
+        // NO-OP
+    }
+
+    @Override
+    public SnsClient getAmazonSNSClient() {
+        return snsClient;
+    }
+}