You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/01 17:54:40 UTC
[camel-kafka-connector] branch camel-master updated: Add
integration tests for AWS v2 lambda
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push:
new 28db5ba Add integration tests for AWS v2 lambda
28db5ba is described below
commit 28db5ba3f9b3b143af97298740de620c6b5cc381
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 25 11:31:36 2021 +0100
Add integration tests for AWS v2 lambda
---
tests/itests-aws-v2/pom.xml | 5 +
.../lambda/sink/CamelAWSLambdaPropertyFactory.java | 79 +++++++++
.../aws/v2/lambda/sink/CamelSinkLambdaITCase.java | 187 +++++++++++++++++++++
.../v2/lambda/sink/TestLambda2Configuration.java | 35 ++++
.../clients/kafka/ByteProducerPropertyFactory.java | 52 ++++++
.../common/clients/kafka/KafkaClient.java | 14 +-
.../common/test/AbstractTestMessageProducer.java | 6 +-
7 files changed, 375 insertions(+), 3 deletions(-)
diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 31b8a89..185a1c1 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -92,6 +92,11 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-sns</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws2-lambda</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java
new file mode 100644
index 0000000..e8a1433
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSLambdaPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSLambdaPropertyFactory> {
+ public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+ public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+ static {
+ SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.accessKey");
+ SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secretKey");
+ SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region");
+
+ KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.access-key");
+ KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secret-key");
+ KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region");
+ }
+
+
+ public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+ return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+ }
+
+ public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+ AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+ return this;
+ }
+
+ public CamelAWSLambdaPropertyFactory withSinkPathFunction(String value) {
+ return setProperty("camel.sink.path.function", value);
+ }
+
+ public CamelAWSLambdaPropertyFactory withSinkEndpointOperation(String value) {
+ return setProperty("camel.sink.endpoint.operation", value);
+ }
+
+ public CamelAWSLambdaPropertyFactory withConfiguration(String value) {
+ return setProperty("camel.component.aws2-lambda.configuration", classRef(value));
+ }
+
+ public CamelAWSLambdaPropertyFactory withPojoRequest(boolean value) {
+ return setProperty("camel.sink.endpoint.pojoRequest", value);
+ }
+
+ public static CamelAWSLambdaPropertyFactory basic() {
+ return new CamelAWSLambdaPropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelAws2lambdaSinkConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+ }
+
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
new file mode 100644
index 0000000..e9911ac
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ByteProducerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ConsumerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+import software.amazon.awssdk.services.lambda.model.FunctionConfiguration;
+import software.amazon.awssdk.services.lambda.model.ListFunctionsResponse;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkLambdaITCase extends CamelSinkTestSupport {
+ @RegisterExtension
+ public static AWSService awsService = AWSServiceFactory.createLambdaService();
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkLambdaITCase.class);
+
+ private LambdaClient client;
+ private String function;
+
+ private volatile int received;
+ private final int expect = 1;
+
+
+ private static class CustomProducer extends AbstractTestMessageProducer<Bytes> {
+ public CustomProducer(String bootstrapServer, String topicName, int count) {
+ super(bootstrapServer, topicName, count);
+ }
+
+ @Override
+ protected KafkaClient<String, Bytes> createKafkaClient(String bootstrapServer) {
+ ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
+ ProducerPropertyFactory producerPropertyFactory = new ByteProducerPropertyFactory(bootstrapServer);
+
+ return new KafkaClient<>(consumerPropertyFactory, producerPropertyFactory);
+ }
+
+ @Override
+ public Bytes testMessageContent(int current) {
+
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ ZipOutputStream zip = new ZipOutputStream(out);
+
+ ZipEntry entry = new ZipEntry("test");
+ zip.putNextEntry(entry);
+ zip.write("hello test".getBytes());
+ zip.closeEntry();
+ zip.finish();
+
+ return Bytes.wrap(out.toByteArray());
+ } catch (IOException e) {
+ LOG.error("I/O error writing zip entry: {}", e.getMessage(), e);
+ fail("I/O error writing zip entry");
+ }
+
+ return null;
+ }
+
+ @Override
+ public Map<String, String> messageHeaders(Bytes text, int current) {
+ Map<String, String> headers = new HashMap<>();
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaOperation",
+ "createFunction");
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRole",
+ "admin");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRuntime",
+ "java8");
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaHandler",
+ "org.apache.camel.kafkaconnector.SomeHandler");
+
+ return headers;
+ }
+ }
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-aws2-lambda-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ client = AWSSDKClientUtils.newLambdaClient();
+
+ function = "function-" + TestUtils.randomWithRange(0, 100);
+ LOG.debug("Using function {} for the test", function);
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ while (true) {
+ ListFunctionsResponse response = client.listFunctions();
+
+ for (FunctionConfiguration functionConfiguration : response.functions()) {
+ LOG.info("Retrieved function {}", functionConfiguration.functionName());
+
+ if (functionConfiguration.functionName().equals(function)) {
+ received = 1;
+ return;
+ }
+ }
+
+ if (!waitForData()) {
+ break;
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(110, TimeUnit.SECONDS)) {
+ assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+ received, expect));
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory testProperties = CamelAWSLambdaPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withConfiguration(TestLambda2Configuration.class.getName())
+ .withAmazonConfig(amazonProperties)
+ .withSinkPathFunction(function)
+ .withSinkEndpointOperation("createFunction");
+
+ runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
+ }
+
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java
new file mode 100644
index 0000000..cb11efd
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import org.apache.camel.component.aws2.lambda.Lambda2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+
+public class TestLambda2Configuration extends Lambda2Configuration {
+ private LambdaClient client;
+
+ @Override
+ public LambdaClient getAwsLambdaClient() {
+ if (client == null) {
+ client = AWSSDKClientUtils.newLambdaClient();
+ }
+
+ return client;
+ }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java
new file mode 100644
index 0000000..2d48d13
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.clients.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class ByteProducerPropertyFactory implements ProducerPropertyFactory {
+ private final String bootstrapServer;
+
+ /**
+ * Constructs the properties using the given bootstrap server
+ * @param bootstrapServer the address of the server in the format
+ * PLAINTEXT://${address}:${port}
+ */
+ public ByteProducerPropertyFactory(String bootstrapServer) {
+ this.bootstrapServer = bootstrapServer;
+ }
+
+ @Override
+ public Properties getProperties() {
+ Properties props = new Properties();
+
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ BytesSerializer.class.getName());
+
+ return props;
+ }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
index 4e33566..c3cd953 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
@@ -74,8 +74,18 @@ public class KafkaClient<K, V> {
* PLAINTEXT://${address}:${port}
*/
public KafkaClient(String bootstrapServer) {
- consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
- producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);
+ this(new DefaultConsumerPropertyFactory(bootstrapServer), new DefaultProducerPropertyFactory(bootstrapServer));
+ }
+
+ /**
+ * Constructs the properties using the given bootstrap server
+ *
+ * @param consumerPropertyFactory a property factory for Kafka client consumers
+ * @param producerPropertyFactory a property factory for Kafka client producers
+ */
+ public KafkaClient(ConsumerPropertyFactory consumerPropertyFactory, ProducerPropertyFactory producerPropertyFactory) {
+ this.consumerPropertyFactory = consumerPropertyFactory;
+ this.producerPropertyFactory = producerPropertyFactory;
producer = new KafkaProducer<>(producerPropertyFactory.getProperties());
consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
index 28d3d0d..477fafa 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
@@ -38,11 +38,15 @@ public abstract class AbstractTestMessageProducer<T> implements TestMessageProdu
}
public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) {
- this.kafkaClient = new KafkaClient<>(bootstrapServer);
+ this.kafkaClient = createKafkaClient(bootstrapServer);
this.topicName = topicName;
this.count = count;
}
+ protected KafkaClient<String, T> createKafkaClient(String bootstrapServer) {
+ return new KafkaClient<>(bootstrapServer);
+ }
+
public void produceMessages() throws ExecutionException, InterruptedException {
LOG.trace("Producing messages ...");
for (int i = 0; i < count; i++) {