You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/01/22 12:06:41 UTC

[camel-kafka-connector] 03/03: Added new integration test for AWS 2 Cloud Watch

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c7a6b0f82acb257905ec844565edfd028ce9cd61
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 22 11:23:06 2021 +0100

    Added new integration test for AWS 2 Cloud Watch
---
 tests/itests-aws-v2/pom.xml                        |   5 +
 .../aws/v2/common/CamelSinkAWSTestSupport.java     |  18 ++-
 .../aws/v2/cw/sink/CamelAWSCWPropertyFactory.java  |  73 ++++++++++
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       | 161 +++++++++++++++++++++
 .../v2/cw/sink/TestCloudWatchConfiguration.java    |  35 +++++
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     |  21 ++-
 6 files changed, 302 insertions(+), 11 deletions(-)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index cd573f3..caf3c28 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -67,6 +67,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-s3</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-cw</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
index c42cb36..a66a474 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.common;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -24,7 +25,6 @@ import java.util.concurrent.Executors;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,13 +33,21 @@ import static org.junit.jupiter.api.Assertions.fail;
 public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
 
+    protected abstract Map<String, String> messageHeaders(String text, int current);
 
-    protected void produceMessages(int count)  {
+    protected void produceMessages(String topicName, int count)  {
         try {
             KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
             for (int i = 0; i < count; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+                String message = "Sink test message " + i;
+                Map<String, String> headers = messageHeaders(message, i);
+
+                if (headers == null) {
+                    kafkaClient.produce(topicName, message);
+                } else {
+                    kafkaClient.produce(topicName, message, headers);
+                }
             }
         } catch (Throwable t) {
             LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
@@ -47,7 +55,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception {
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
 
@@ -58,7 +66,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
         service.submit(() -> consumeMessages(latch));
 
         LOG.debug("Creating the producer and sending messages ...");
-        produceMessages(count);
+        produceMessages(topic, count);
 
         LOG.debug("Waiting for the test to complete");
         verifyMessages(latch);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
new file mode 100644
index 0000000..cd7f638
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSCWPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSCWPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+    }
+
+    public CamelAWSCWPropertyFactory withSinkPathNamespace(String value) {
+        return setProperty("camel.sink.path.namespace", value);
+    }
+
+    public CamelAWSCWPropertyFactory withName(String value) {
+        return setProperty("camel.sink.endpoint.name", value);
+    }
+
+    public CamelAWSCWPropertyFactory withConfiguration(String value) {
+        return setProperty("camel.component.aws2-cw.configuration", classRef(value));
+    }
+
+    public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public static CamelAWSCWPropertyFactory basic() {
+        return new CamelAWSCWPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAWSCWConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.aws2cw.CamelAws2cwSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
new file mode 100644
index 0000000..62c7122
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse;
+import software.amazon.awssdk.services.cloudwatch.model.Metric;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
+
+    @RegisterExtension
+    public static AWSService awsService = AWSServiceFactory.createCloudWatchService();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSCWITCase.class);
+
+    private CloudWatchClient client;
+    private String namespace;
+    private String metricName = "test-metric";
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-cw-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = AWSSDKClientUtils.newCloudWatchClient();
+
+        namespace = "cw-" + TestUtils.randomWithRange(0, 1000);
+        LOG.debug("Using namespace {} for the test", namespace);
+
+        received = 0;
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> headers = new HashMap<>();
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
+                "test-dimension-" + current);
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
+
+        return headers;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            ListMetricsRequest request = ListMetricsRequest.builder()
+                    .namespace(namespace)
+                    .metricName(metricName)
+                    .build();
+
+            while (true) {
+                ListMetricsResponse response = client.listMetrics(request);
+
+                for (Metric metric : response.metrics()) {
+                    LOG.info("Retrieved metric {}", metric.metricName());
+
+                    for (Dimension dimension : metric.dimensions()) {
+                        LOG.info("Dimension {} value: {}", dimension.name(), dimension.value());
+                        received++;
+
+                        if (received == expect) {
+                            return;
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(Duration.ofSeconds(1).toMillis());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+
+    @Test
+    @Timeout(value = 120)
+    public void testBasicSendReceive() {
+        try {
+            Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+            ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+                    .basic()
+                    .withTopics(topicName)
+                    .withConfiguration(TestCloudWatchConfiguration.class.getName())
+                    .withAmazonConfig(amazonProperties)
+                    .withName(metricName)
+                    .withSinkPathNamespace(namespace);
+
+            runTest(testProperties, topicName, expect);
+        } catch (Exception e) {
+            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
new file mode 100644
index 0000000..e214289
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import org.apache.camel.component.aws2.cw.Cw2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+
+public class TestCloudWatchConfiguration extends Cw2Configuration {
+    private CloudWatchClient client;
+
+    @Override
+    public CloudWatchClient getAmazonCwClient() {
+        if (client == null) {
+            client = AWSSDKClientUtils.newCloudWatchClient();
+        }
+
+        return client;
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 6cc9b79..5b17a70 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,6 +18,7 @@
 package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -88,6 +89,11 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     }
 
     @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(110, TimeUnit.SECONDS)) {
             assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
@@ -127,15 +133,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceive() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorSpringBootStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withAmazonConfig(amazonProperties)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
@@ -148,15 +155,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceiveUsingKafkaStyle() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorKafkaStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
@@ -170,11 +178,12 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
     public void testBasicSendReceiveUsingUrl() {
         try {
             Properties amazonProperties = awsService.getConnectionProperties();
+            String topicName = TestUtils.getDefaultTestTopic(this.getClass());
 
             ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
                     .basic()
                     .withName("CamelAwssqsSinkConnectorUsingUrl")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                    .withTopics(topicName)
                     .withUrl(queueName)
                         .append("autoCreateQueue", "true")
                         .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -184,7 +193,7 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
                         .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
                         .buildUrl();
 
-            runTest(testProperties, expect);
+            runTest(testProperties, topicName, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);