You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/01/22 12:06:41 UTC
[camel-kafka-connector] 03/03: Added new integration test for AWS 2
Cloud Watch
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c7a6b0f82acb257905ec844565edfd028ce9cd61
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 22 11:23:06 2021 +0100
Added new integration test for AWS 2 Cloud Watch
---
tests/itests-aws-v2/pom.xml | 5 +
.../aws/v2/common/CamelSinkAWSTestSupport.java | 18 ++-
.../aws/v2/cw/sink/CamelAWSCWPropertyFactory.java | 73 ++++++++++
.../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 161 +++++++++++++++++++++
.../v2/cw/sink/TestCloudWatchConfiguration.java | 35 +++++
.../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 21 ++-
6 files changed, 302 insertions(+), 11 deletions(-)
diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index cd573f3..caf3c28 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -67,6 +67,11 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-s3</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws2-cw</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
index c42cb36..a66a474 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -17,6 +17,7 @@
package org.apache.camel.kafkaconnector.aws.v2.common;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -24,7 +25,6 @@ import java.util.concurrent.Executors;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,13 +33,21 @@ import static org.junit.jupiter.api.Assertions.fail;
public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
+ protected abstract Map<String, String> messageHeaders(String text, int current);
- protected void produceMessages(int count) {
+ protected void produceMessages(String topicName, int count) {
try {
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
for (int i = 0; i < count; i++) {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
+ String message = "Sink test message " + i;
+ Map<String, String> headers = messageHeaders(message, i);
+
+ if (headers == null) {
+ kafkaClient.produce(topicName, message);
+ } else {
+ kafkaClient.produce(topicName, message, headers);
+ }
}
} catch (Throwable t) {
LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
@@ -47,7 +55,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
}
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception {
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
@@ -58,7 +66,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
service.submit(() -> consumeMessages(latch));
LOG.debug("Creating the producer and sending messages ...");
- produceMessages(count);
+ produceMessages(topic, count);
LOG.debug("Waiting for the test to complete");
verifyMessages(latch);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
new file mode 100644
index 0000000..cd7f638
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSCWPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSCWPropertyFactory> {
+ public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+ public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+ static {
+ SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.accessKey");
+ SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secretKey");
+ SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+
+ KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.access-key");
+ KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secret-key");
+ KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region");
+ }
+
+ public CamelAWSCWPropertyFactory withSinkPathNamespace(String value) {
+ return setProperty("camel.sink.path.namespace", value);
+ }
+
+ public CamelAWSCWPropertyFactory withName(String value) {
+ return setProperty("camel.sink.endpoint.name", value);
+ }
+
+ public CamelAWSCWPropertyFactory withConfiguration(String value) {
+ return setProperty("camel.component.aws2-cw.configuration", classRef(value));
+ }
+
+ public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+ return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+ }
+
+ public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+ AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+ return this;
+ }
+
+ public static CamelAWSCWPropertyFactory basic() {
+ return new CamelAWSCWPropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAWSCWConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.aws2cw.CamelAws2cwSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
new file mode 100644
index 0000000..62c7122
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest;
+import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse;
+import software.amazon.awssdk.services.cloudwatch.model.Metric;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
+
+ @RegisterExtension
+ public static AWSService awsService = AWSServiceFactory.createCloudWatchService();
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSCWITCase.class);
+
+ private CloudWatchClient client;
+ private String namespace;
+ private String metricName = "test-metric";
+
+ private volatile int received;
+ private final int expect = 10;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-aws2-cw-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ client = AWSSDKClientUtils.newCloudWatchClient();
+
+ namespace = "cw-" + TestUtils.randomWithRange(0, 1000);
+ LOG.debug("Using namespace {} for the test", namespace);
+
+ received = 0;
+ }
+
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName",
+ "test-dimension-" + current);
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current));
+
+ return headers;
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ ListMetricsRequest request = ListMetricsRequest.builder()
+ .namespace(namespace)
+ .metricName(metricName)
+ .build();
+
+ while (true) {
+ ListMetricsResponse response = client.listMetrics(request);
+
+ for (Metric metric : response.metrics()) {
+ LOG.info("Retrieved metric {}", metric.metricName());
+
+ for (Dimension dimension : metric.dimensions()) {
+ LOG.info("Dimension {} value: {}", dimension.name(), dimension.value());
+ received++;
+
+ if (received == expect) {
+ return;
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ break;
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(110, TimeUnit.SECONDS)) {
+ assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+ received, expect));
+ }
+ }
+
+
+ @Test
+ @Timeout(value = 120)
+ public void testBasicSendReceive() {
+ try {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withConfiguration(TestCloudWatchConfiguration.class.getName())
+ .withAmazonConfig(amazonProperties)
+ .withName(metricName)
+ .withSinkPathNamespace(namespace);
+
+ runTest(testProperties, topicName, expect);
+ } catch (Exception e) {
+ LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
new file mode 100644
index 0000000..e214289
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.cw.sink;
+
+import org.apache.camel.component.aws2.cw.Cw2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+
+public class TestCloudWatchConfiguration extends Cw2Configuration {
+ private CloudWatchClient client;
+
+ @Override
+ public CloudWatchClient getAmazonCwClient() {
+ if (client == null) {
+ client = AWSSDKClientUtils.newCloudWatchClient();
+ }
+
+ return client;
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 6cc9b79..5b17a70 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,6 +18,7 @@
package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -88,6 +89,11 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
}
@Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
+
+ @Override
protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
if (latch.await(110, TimeUnit.SECONDS)) {
assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
@@ -127,15 +133,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
.withName("CamelAwssqsSinkConnectorSpringBootStyle")
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withAmazonConfig(amazonProperties)
.withQueueNameOrArn(queueName);
- runTest(testProperties, expect);
+ runTest(testProperties, topicName, expect);
} catch (Exception e) {
LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
@@ -148,15 +155,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
public void testBasicSendReceiveUsingKafkaStyle() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
.withName("CamelAwssqsSinkConnectorKafkaStyle")
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
.withQueueNameOrArn(queueName);
- runTest(testProperties, expect);
+ runTest(testProperties, topicName, expect);
} catch (Exception e) {
LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
@@ -170,11 +178,12 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
public void testBasicSendReceiveUsingUrl() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory
.basic()
.withName("CamelAwssqsSinkConnectorUsingUrl")
- .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withTopics(topicName)
.withUrl(queueName)
.append("autoCreateQueue", "true")
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -184,7 +193,7 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
.append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
.buildUrl();
- runTest(testProperties, expect);
+ runTest(testProperties, topicName, expect);
} catch (Exception e) {
LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);