You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 15:37:54 UTC
[camel-kafka-connector] branch camel-master updated: Added AWS v2
SNS sink integration test
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push:
new fc78b55 Added AWS v2 SNS sink integration test
fc78b55 is described below
commit fc78b554ccd467a09a3081f634aa3cdcac3ab8d7
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 9 15:59:52 2020 +0100
Added AWS v2 SNS sink integration test
---
tests/itests-aws-v2/pom.xml | 5 +
.../v2/sns/sink/CamelAWSSNSPropertyFactory.java | 89 +++++++++++
.../aws/v2/sns/sink/CamelSinkAWSSNSITCase.java | 172 +++++++++++++++++++++
.../aws/v2/sns/sink/TestSnsConfiguration.java | 40 +++++
4 files changed, 306 insertions(+)
diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 62048dc..84b13e8 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -87,6 +87,11 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-kms</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws2-sns</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java
new file mode 100644
index 0000000..0fafbfb
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelAWSSNSPropertyFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * Creates the set of properties used by a Camel JMS Sink Connector
+ */
+final class CamelAWSSNSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSNSPropertyFactory> {
+ public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+ public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+ static {
+ SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.accessKey");
+ SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secretKey");
+ SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");
+
+ KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-sns.access-key");
+ KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-sns.secret-key");
+ KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-sns.region");
+ }
+
+ private CamelAWSSNSPropertyFactory() {
+ }
+
+ public EndpointUrlBuilder<CamelAWSSNSPropertyFactory> withUrl(String topicOrArn) {
+ String sinkUrl = String.format("aws2-sns:%s", topicOrArn);
+
+ return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+ }
+
+ public CamelAWSSNSPropertyFactory withTopicOrArn(String topicOrArn) {
+ return setProperty("camel.sink.path.topicNameOrArn", topicOrArn);
+ }
+
+ public CamelAWSSNSPropertyFactory withSubscribeSNStoSQS(String queue) {
+ return setProperty("camel.sink.endpoint.subscribeSNStoSQS", "true").setProperty("camel.sink.endpoint.queueUrl",
+ queue);
+ }
+
+ public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+ return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+ }
+
+ public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+ String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY);
+ String secretKeyKey = style.get(AWSConfigs.SECRET_KEY);
+ String regionKey = style.get(AWSConfigs.REGION);
+
+ setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
+ setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
+ return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()));
+ }
+
+ public CamelAWSSNSPropertyFactory withConfiguration(String configurationClass) {
+ return setProperty("camel.component.aws2-sns.configuration", classRef(configurationClass));
+ }
+
+ public static CamelAWSSNSPropertyFactory basic() {
+ return new CamelAWSSNSPropertyFactory().withName("CamelAWS2SNSSinkConnector")
+ .withTasksMax(1)
+ .withConnectorClass("org.apache.camel.kafkaconnector.aws2sns.CamelAws2snsSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java
new file mode 100644
index 0000000..f78fc68
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory.classRef;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
+ @RegisterExtension
+ public static AWSService service = AWSServiceFactory.createSNSService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);
+
+ private AWSSQSClient awsSqsClient;
+ private String sqsQueueUrl;
+ private String queueName;
+
+ private volatile int received;
+ private final int expect = 10;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-aws2-sns-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ awsSqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
+
+ queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
+ sqsQueueUrl = awsSqsClient.createQueue(queueName);
+
+ LOG.info("Created SQS queue {}", sqsQueueUrl);
+
+ received = 0;
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(120, TimeUnit.SECONDS)) {
+ assertEquals(expect, received,
+ "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail("Failed to receive the messages within the specified time");
+ }
+ }
+
+ private boolean checkMessages(List<Message> messages) {
+ for (Message message : messages) {
+ LOG.info("Received: {}", message.body());
+
+ received++;
+ }
+
+ if (received == expect) {
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ awsSqsClient.receive(sqsQueueUrl, this::checkMessages);
+ } catch (Throwable t) {
+ LOG.error("Failed to consume messages: {}", t.getMessage(), t);
+ fail(t.getMessage());
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Test
+ @Timeout(value = 90)
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = service.getConnectionProperties();
+ String topicName = getTopicForTest(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+ .basic()
+ .withName("CamelAWSSNSSinkConnectorDefault")
+ .withTopics(topicName)
+ .withTopicOrArn(queueName)
+ .withSubscribeSNStoSQS(sqsQueueUrl)
+ .withConfiguration(TestSnsConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties);
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(value = 90)
+ public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+ Properties amazonProperties = service.getConnectionProperties();
+ String topicName = getTopicForTest(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+ .basic()
+ .withName("CamelAWSSNSSinkKafkaStyleConnector")
+ .withTopics(topicName)
+ .withTopicOrArn(queueName)
+ .withSubscribeSNStoSQS(sqsQueueUrl)
+ .withConfiguration(TestSnsConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+
+ @Test
+ @Timeout(value = 90)
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ Properties amazonProperties = service.getConnectionProperties();
+ String topicName = getTopicForTest(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory
+ .basic()
+ .withName("CamelAWSSNSSinkKafkaStyleConnectorWithUrl")
+ .withTopics(topicName)
+ .withUrl(queueName)
+ .append("queueUrl", sqsQueueUrl)
+ .append("subscribeSNStoSQS", "true")
+ .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
+ .append("configuration", classRef(TestSnsConfiguration.class.getName()))
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
+
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java
new file mode 100644
index 0000000..8dbc040
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/TestSnsConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.sns.sink;
+
+import org.apache.camel.component.aws2.sns.Sns2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.sns.SnsClient;
+
+public class TestSnsConfiguration extends Sns2Configuration {
+ private SnsClient snsClient;
+
+ public TestSnsConfiguration() {
+ snsClient = AWSSDKClientUtils.newSNSClient();
+ }
+
+ @Override
+ public void setAmazonSNSClient(SnsClient amazonSNSClient) {
+ // NO-OP
+ }
+
+ @Override
+ public SnsClient getAmazonSNSClient() {
+ return snsClient;
+ }
+}