You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/01/28 13:40:10 UTC

[camel-kafka-connector] branch master updated: Added AWS v2 S3 sink test case

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 513f1e2  Added AWS v2 S3 sink test case
513f1e2 is described below

commit 513f1e2aa85aa0d71bba16abed7d97abbbe63cc1
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Jan 28 14:01:28 2021 +0100

    Added AWS v2 S3 sink test case
---
 .../kafkaconnector/aws/v2/s3/common/S3Utils.java   |  97 +++++++++++++
 .../s3/{source => common}/TestS3Configuration.java |   2 +-
 .../aws/v2/s3/sink/CamelAWSS3PropertyFactory.java  |  76 +++++++++++
 .../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java       | 152 +++++++++++++++++++++
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   |  52 +------
 5 files changed, 332 insertions(+), 47 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
new file mode 100644
index 0000000..25e0ec7
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.common;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public final class S3Utils {
+    private static final Logger LOG = LoggerFactory.getLogger(S3Utils.class);
+
+    private S3Utils() {
+
+    }
+
+    public static List<S3Object> listObjects(S3Client s3Client, String bucketName) {
+        try {
+            ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
+                    .bucket(bucketName)
+                    .build();
+
+            ListObjectsV2Response objectListing = s3Client.listObjectsV2(listObjectsRequest);
+
+            return objectListing.contents();
+        } catch (Exception e) {
+            LOG.debug("Error listing: {}", e.getMessage(), e);
+            throw e;
+        }
+    }
+
+    /**
+     * Delete an S3 bucket using the provided client. Coming from AWS documentation:
+     * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
+     *
+     * AWS SDK v1 doc for reference:
+     * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
+     * @param s3Client the AmazonS3 client instance used to delete the bucket
+     * @param bucketName a String containing the bucket name
+     */
+    public static void deleteBucket(S3Client s3Client, String bucketName) {
+        // Delete all objects from the bucket. This is sufficient
+        // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
+        // delete markers for all objects, but doesn't delete the object versions.
+        // To delete objects from versioned buckets, delete all of the object versions before deleting
+        // the bucket (see below for an example).
+        ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
+                .bucket(bucketName)
+                .build();
+
+        ListObjectsV2Response objectListing;
+        do {
+            objectListing = s3Client.listObjectsV2(listObjectsRequest);
+
+            for (S3Object s3Object : objectListing.contents()) {
+                s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build());
+            }
+
+            listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName)
+                    .continuationToken(objectListing.nextContinuationToken())
+                    .build();
+        } while (objectListing.isTruncated());
+
+        s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build());
+    }
+
+
+    public static void createBucket(S3Client s3Client, String bucketName) {
+        CreateBucketRequest request = CreateBucketRequest.builder()
+                .bucket(bucketName)
+                .build();
+
+        s3Client.createBucket(request);
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/TestS3Configuration.java
similarity index 95%
rename from tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java
rename to tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/TestS3Configuration.java
index ff7c897..8932d0d 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/TestS3Configuration.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/TestS3Configuration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+package org.apache.camel.kafkaconnector.aws.v2.s3.common;
 
 import org.apache.camel.component.aws2.s3.AWS2S3Configuration;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelAWSS3PropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelAWSS3PropertyFactory.java
new file mode 100644
index 0000000..23264b5
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelAWSS3PropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+public final class CamelAWSS3PropertyFactory extends SinkConnectorPropertyFactory<CamelAWSS3PropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-s3.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-s3.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-s3.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-s3.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-s3.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-s3.region");
+    }
+
+    private CamelAWSS3PropertyFactory() {
+
+    }
+
+    public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public CamelAWSS3PropertyFactory withBucketNameOrArn(String bucketNameOrArn) {
+        return setProperty("camel.sink.path.bucketNameOrArn", bucketNameOrArn);
+    }
+
+    public CamelAWSS3PropertyFactory withConfiguration(String configurationClass) {
+        return setProperty("camel.component.aws2-s3.configuration", classRef(configurationClass));
+    }
+
+    public CamelAWSS3PropertyFactory withAutoCreateBucket(boolean value) {
+        return setProperty("camel.sink.endpoint.autoCreateBucket", value);
+    }
+
+    public static CamelAWSS3PropertyFactory basic() {
+        return new CamelAWSS3PropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelAws2s3SinkConnectorConfig")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
new file mode 100644
index 0000000..fe64695
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkAWSS3ITCase extends CamelSinkAWSTestSupport {
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSS3ITCase.class);
+
+    private S3Client awsS3Client;
+    private String bucketName;
+
+    private volatile int received;
+    private int expect = 10;
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> headers = new HashMap<>();
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3Key",
+                "file" + current + ".txt");
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsS3BucketName",
+                bucketName);
+
+        return headers;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            while (true) {
+                List<S3Object> objectList = S3Utils.listObjects(awsS3Client, bucketName);
+
+                for (S3Object object : objectList) {
+                    LOG.info("Object key: {}", object.key());
+                }
+
+                received = objectList.size();
+                if (received >= expect) {
+                    return;
+                }
+
+                if (!waitForData()) {
+                    return;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-s3-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        awsS3Client = AWSSDKClientUtils.newS3Client();
+        received = 0;
+        bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
+
+        try {
+            createBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.error("Unable to create bucket: {}", e.getMessage(), e);
+            fail("Unable to create bucket");
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+        try {
+            deleteBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
+        }
+    }
+
+    @Test
+//    @Timeout(180)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory testProperties = CamelAWSS3PropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withAmazonConfig(amazonProperties)
+                .withBucketNameOrArn(bucketName)
+                .withAutoCreateBucket(true);
+
+        runTest(testProperties, topicName, expect);
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index bae5c77..a1a3e9e 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -44,14 +45,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
-import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.S3Object;
 
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -79,41 +76,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         return new String[] {"camel-aws2-s3-kafka-connector"};
     }
 
-    /**
-     * Delete an S3 bucket using the provided client. Coming from AWS documentation:
-     * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
-     *
-     * AWS SDK v1 doc for reference:
-     * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
-     * @param s3Client the AmazonS3 client instance used to delete the bucket
-     * @param bucketName a String containing the bucket name
-     */
-    private static void deleteBucket(S3Client s3Client, String bucketName) {
-        // Delete all objects from the bucket. This is sufficient
-        // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
-        // delete markers for all objects, but doesn't delete the object versions.
-        // To delete objects from versioned buckets, delete all of the object versions before deleting
-        // the bucket (see below for an example).
-        ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
-                .bucket(bucketName)
-                .build();
-
-        ListObjectsV2Response objectListing;
-        do {
-            objectListing = s3Client.listObjectsV2(listObjectsRequest);
-
-            for (S3Object s3Object : objectListing.contents()) {
-                s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build());
-            }
-
-            listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName)
-                    .continuationToken(objectListing.nextContinuationToken())
-                    .build();
-        } while (objectListing.isTruncated());
-
-        s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build());
-    }
-
     @BeforeEach
     public void setUp() {
         awsS3Client = AWSSDKClientUtils.newS3Client();
@@ -121,17 +83,15 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
 
         try {
-            CreateBucketRequest request = CreateBucketRequest.builder()
-                    .bucket(bucketName)
-                    .build();
-
-            awsS3Client.createBucket(request);
+            createBucket(awsS3Client, bucketName);
         } catch (Exception e) {
             LOG.error("Unable to create bucket: {}", e.getMessage(), e);
             fail("Unable to create bucket");
         }
     }
 
+
+
     @AfterEach
     public void tearDown() {
         try {