You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/03/01 17:54:40 UTC

[camel-kafka-connector] branch camel-master updated: Add integration tests for AWS v2 lambda

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/camel-master by this push:
     new 28db5ba  Add integration tests for AWS v2 lambda
28db5ba is described below

commit 28db5ba3f9b3b143af97298740de620c6b5cc381
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 25 11:31:36 2021 +0100

    Add integration tests for AWS v2 lambda
---
 tests/itests-aws-v2/pom.xml                        |   5 +
 .../lambda/sink/CamelAWSLambdaPropertyFactory.java |  79 +++++++++
 .../aws/v2/lambda/sink/CamelSinkLambdaITCase.java  | 187 +++++++++++++++++++++
 .../v2/lambda/sink/TestLambda2Configuration.java   |  35 ++++
 .../clients/kafka/ByteProducerPropertyFactory.java |  52 ++++++
 .../common/clients/kafka/KafkaClient.java          |  14 +-
 .../common/test/AbstractTestMessageProducer.java   |   6 +-
 7 files changed, 375 insertions(+), 3 deletions(-)

diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml
index 31b8a89..185a1c1 100644
--- a/tests/itests-aws-v2/pom.xml
+++ b/tests/itests-aws-v2/pom.xml
@@ -92,6 +92,11 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-aws2-sns</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-lambda</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java
new file mode 100644
index 0000000..e8a1433
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public class CamelAWSLambdaPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSLambdaPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region");
+    }
+
+
+    public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public CamelAWSLambdaPropertyFactory withSinkPathFunction(String value) {
+        return setProperty("camel.sink.path.function", value);
+    }
+
+    public CamelAWSLambdaPropertyFactory withSinkEndpointOperation(String value) {
+        return setProperty("camel.sink.endpoint.operation", value);
+    }
+
+    public CamelAWSLambdaPropertyFactory withConfiguration(String value) {
+        return setProperty("camel.component.aws2-lambda.configuration", classRef(value));
+    }
+
+    public CamelAWSLambdaPropertyFactory withPojoRequest(boolean value) {
+        return setProperty("camel.sink.endpoint.pojoRequest", value);
+    }
+
+    public static CamelAWSLambdaPropertyFactory basic() {
+        return new CamelAWSLambdaPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAws2lambdaSinkConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+
+    }
+
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
new file mode 100644
index 0000000..e9911ac
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ByteProducerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ConsumerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+import software.amazon.awssdk.services.lambda.model.FunctionConfiguration;
+import software.amazon.awssdk.services.lambda.model.ListFunctionsResponse;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkLambdaITCase extends CamelSinkTestSupport {
+    @RegisterExtension
+    public static AWSService awsService = AWSServiceFactory.createLambdaService();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkLambdaITCase.class);
+
+    private LambdaClient client;
+    private String function;
+
+    private volatile int received;
+    private final int expect = 1;
+
+
+    private static class CustomProducer extends AbstractTestMessageProducer<Bytes> {
+        public CustomProducer(String bootstrapServer, String topicName, int count) {
+            super(bootstrapServer, topicName, count);
+        }
+
+        @Override
+        protected KafkaClient<String, Bytes> createKafkaClient(String bootstrapServer) {
+            ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
+            ProducerPropertyFactory producerPropertyFactory = new ByteProducerPropertyFactory(bootstrapServer);
+
+            return new KafkaClient<>(consumerPropertyFactory, producerPropertyFactory);
+        }
+
+        @Override
+        public Bytes testMessageContent(int current) {
+
+            try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                ZipOutputStream zip = new ZipOutputStream(out);
+
+                ZipEntry entry = new ZipEntry("test");
+                zip.putNextEntry(entry);
+                zip.write("hello test".getBytes());
+                zip.closeEntry();
+                zip.finish();
+
+                return Bytes.wrap(out.toByteArray());
+            } catch (IOException e) {
+                LOG.error("I/O error writing zip entry: {}", e.getMessage(), e);
+                fail("I/O error writing zip entry");
+            }
+
+            return null;
+        }
+
+        @Override
+        public Map<String, String> messageHeaders(Bytes text, int current) {
+            Map<String, String> headers = new HashMap<>();
+
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaOperation",
+                    "createFunction");
+
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRole",
+                    "admin");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRuntime",
+                    "java8");
+            headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaHandler",
+                    "org.apache.camel.kafkaconnector.SomeHandler");
+
+            return headers;
+        }
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-lambda-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        client = AWSSDKClientUtils.newLambdaClient();
+
+        function = "function-" + TestUtils.randomWithRange(0, 100);
+        LOG.debug("Using function {} for the test", function);
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            while (true) {
+                ListFunctionsResponse response = client.listFunctions();
+
+                for (FunctionConfiguration functionConfiguration : response.functions()) {
+                    LOG.info("Retrieved function {}", functionConfiguration.functionName());
+
+                    if (functionConfiguration.functionName().equals(function)) {
+                        received = 1;
+                        return;
+                    }
+                }
+
+                if (!waitForData()) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory testProperties = CamelAWSLambdaPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConfiguration(TestLambda2Configuration.class.getName())
+                .withAmazonConfig(amazonProperties)
+                .withSinkPathFunction(function)
+                .withSinkEndpointOperation("createFunction");
+
+        runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect));
+    }
+
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java
new file mode 100644
index 0000000..cb11efd
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.lambda.sink;
+
+import org.apache.camel.component.aws2.lambda.Lambda2Configuration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+
+public class TestLambda2Configuration extends Lambda2Configuration {
+    private LambdaClient client;
+
+    @Override
+    public LambdaClient getAwsLambdaClient() {
+        if (client == null) {
+            client = AWSSDKClientUtils.newLambdaClient();
+        }
+
+        return client;
+    }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java
new file mode 100644
index 0000000..2d48d13
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.clients.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class ByteProducerPropertyFactory implements ProducerPropertyFactory {
+    private final String bootstrapServer;
+
+    /**
+     * Constructs the properties using the given bootstrap server
+     * @param bootstrapServer the address of the server in the format
+     *                       PLAINTEXT://${address}:${port}
+     */
+    public ByteProducerPropertyFactory(String bootstrapServer) {
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    @Override
+    public Properties getProperties() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                BytesSerializer.class.getName());
+
+        return props;
+    }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
index 4e33566..c3cd953 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java
@@ -74,8 +74,18 @@ public class KafkaClient<K, V> {
      *                        PLAINTEXT://${address}:${port}
      */
     public KafkaClient(String bootstrapServer) {
-        consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer);
-        producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer);
+        this(new DefaultConsumerPropertyFactory(bootstrapServer), new DefaultProducerPropertyFactory(bootstrapServer));
+    }
+
+    /**
+     * Constructs the properties using the given bootstrap server
+     *
+     * @param consumerPropertyFactory a property factory for Kafka client consumers
+     * @param producerPropertyFactory a property factory for Kafka client producers
+     */
+    public KafkaClient(ConsumerPropertyFactory consumerPropertyFactory, ProducerPropertyFactory producerPropertyFactory) {
+        this.consumerPropertyFactory = consumerPropertyFactory;
+        this.producerPropertyFactory = producerPropertyFactory;
 
         producer = new KafkaProducer<>(producerPropertyFactory.getProperties());
         consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties());
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
index 28d3d0d..477fafa 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java
@@ -38,11 +38,15 @@ public abstract class AbstractTestMessageProducer<T> implements TestMessageProdu
     }
 
     public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) {
-        this.kafkaClient = new KafkaClient<>(bootstrapServer);
+        this.kafkaClient = createKafkaClient(bootstrapServer);
         this.topicName = topicName;
         this.count = count;
     }
 
+    protected KafkaClient<String, T> createKafkaClient(String bootstrapServer) {
+        return new KafkaClient<>(bootstrapServer);
+    }
+
     public void produceMessages() throws ExecutionException, InterruptedException {
         LOG.trace("Producing messages ...");
         for (int i = 0; i < count; i++) {