You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/11/19 16:50:02 UTC
[camel-kafka-connector] branch camel-master updated: Updated AWS
test services according to interface changes from camel 3.7
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push:
new 5ab5966 Updated AWS test services according to interface changes from camel 3.7
5ab5966 is described below
commit 5ab596662a0d7d47b0bf37ddc2f5b2d8dba0c3aa
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Nov 19 09:34:41 2020 +0100
Updated AWS test services according to interface changes from camel 3.7
---
.../aws/v1/clients/AWSClientUtils.java | 89 ----------------------
.../source/CamelSourceAWSKinesisITCase.java | 5 +-
.../aws/v1/s3/source/CamelSourceAWSS3ITCase.java | 67 +++++++++++++++-
.../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 6 +-
.../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 6 +-
.../aws/v1/sqs/source/CamelSourceAWSSQSITCase.java | 6 +-
.../aws/v2/clients/AWSSDKClientUtils.java | 67 ----------------
.../source/CamelSourceAWSKinesisITCase.java | 5 +-
.../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 48 +++++++++++-
.../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 6 +-
.../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java | 6 +-
11 files changed, 128 insertions(+), 183 deletions(-)
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java
deleted file mode 100644
index 93399a8..0000000
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSClientUtils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.aws.v1.clients;
-
-import java.util.Iterator;
-
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class AWSClientUtils {
- private static final Logger LOG = LoggerFactory.getLogger(AWSClientUtils.class);
-
- private AWSClientUtils() {
- }
-
- /**
- * Delete an S3 bucket using the provided client. Coming from AWS documentation:
- * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
- *
- * @param s3Client
- * the AmazonS3 client instance used to delete the bucket
- * @param bucketName
- * a String containing the bucket name
- */
- public static void deleteBucket(AmazonS3 s3Client, String bucketName) {
- // Delete all objects from the bucket. This is sufficient
- // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
- // delete markers for all objects, but doesn't delete the object versions.
- // To delete objects from versioned buckets, delete all of the object versions before deleting
- // the bucket (see below for an example).
- ObjectListing objectListing = s3Client.listObjects(bucketName);
- while (true) {
- Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
- while (objIter.hasNext()) {
- s3Client.deleteObject(bucketName, objIter.next().getKey());
- }
-
- // If the bucket contains many objects, the listObjects() call
- // might not return all of the objects in the first listing. Check to
- // see whether the listing was truncated. If so, retrieve the next page of objects
- // and delete them.
- if (objectListing.isTruncated()) {
- objectListing = s3Client.listNextBatchOfObjects(objectListing);
- } else {
- break;
- }
- }
-
- // Delete all object versions (required for versioned buckets).
- VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName));
- while (true) {
- Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator();
- while (versionIter.hasNext()) {
- S3VersionSummary vs = versionIter.next();
- s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId());
- }
-
- if (versionList.isTruncated()) {
- versionList = s3Client.listNextBatchOfVersions(versionList);
- } else {
- break;
- }
- }
-
- // After all objects and object versions are deleted, delete the bucket.
- s3Client.deleteBucket(bucketName);
- }
-}
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
index 0dad306..d16f23c 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -37,6 +37,7 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws.services.AWSServiceFactory;
@@ -56,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<AmazonKinesis> service = AWSServiceFactory.createKinesisService();
+ public static AWSService service = AWSServiceFactory.createKinesisService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
@@ -124,7 +125,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
public void setUp() {
streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100);
- awsKinesisClient = service.getClient();
+ awsKinesisClient = AWSClientUtils.newKinesisClient();
received = 0;
createStream();
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
index 05e2319..21af556 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
@@ -18,16 +18,22 @@
package org.apache.camel.kafkaconnector.aws.v1.s3.source;
import java.io.File;
+import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
-import org.apache.camel.kafkaconnector.aws.v1.clients.AWSClientUtils;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
+import com.amazonaws.services.s3.model.VersionListing;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -49,7 +55,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service();
+ public static AWSService service = AWSServiceFactory.createS3Service();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
private AmazonS3 awsS3Client;
@@ -57,6 +63,59 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
private volatile int received;
private final int expect = 10;
+ /**
+ * Delete an S3 bucket using the provided client. Coming from AWS documentation:
+ * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
+ *
+ * @param s3Client
+ * the AmazonS3 client instance used to delete the bucket
+ * @param bucketName
+ * a String containing the bucket name
+ */
+ public static void deleteBucket(AmazonS3 s3Client, String bucketName) {
+ // Delete all objects from the bucket. This is sufficient
+ // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
+ // delete markers for all objects, but doesn't delete the object versions.
+ // To delete objects from versioned buckets, delete all of the object versions before deleting
+ // the bucket (see below for an example).
+ ObjectListing objectListing = s3Client.listObjects(bucketName);
+ while (true) {
+ Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator();
+ while (objIter.hasNext()) {
+ s3Client.deleteObject(bucketName, objIter.next().getKey());
+ }
+
+ // If the bucket contains many objects, the listObjects() call
+ // might not return all of the objects in the first listing. Check to
+ // see whether the listing was truncated. If so, retrieve the next page of objects
+ // and delete them.
+ if (objectListing.isTruncated()) {
+ objectListing = s3Client.listNextBatchOfObjects(objectListing);
+ } else {
+ break;
+ }
+ }
+
+ // Delete all object versions (required for versioned buckets).
+ VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName));
+ while (true) {
+ Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator();
+ while (versionIter.hasNext()) {
+ S3VersionSummary vs = versionIter.next();
+ s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId());
+ }
+
+ if (versionList.isTruncated()) {
+ versionList = s3Client.listNextBatchOfVersions(versionList);
+ } else {
+ break;
+ }
+ }
+
+ // After all objects and object versions are deleted, delete the bucket.
+ s3Client.deleteBucket(bucketName);
+ }
+
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws-s3-kafka-connector"};
@@ -64,7 +123,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awsS3Client = service.getClient();
+ awsS3Client = AWSClientUtils.newS3Client();
received = 0;
try {
@@ -78,7 +137,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
@AfterEach
public void tearDown() {
try {
- AWSClientUtils.deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET);
+ deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET);
} catch (Exception e) {
LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
}
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 93a709c..4675a30 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -26,13 +26,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -52,7 +52,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<AmazonSQS> service = AWSServiceFactory.createSNSService();
+ public static AWSService service = AWSServiceFactory.createSNSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);
@@ -70,7 +70,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awsSqsClient = new AWSSQSClient(service.getClient());
+ awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
sqsQueueUrl = awsSqsClient.getQueue(queueName);
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index c595c67..b993ad4 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -25,13 +25,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<AmazonSQS> awsService = AWSServiceFactory.createSQSService();
+ public static AWSService awsService = AWSServiceFactory.createSQSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
@@ -71,7 +71,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awssqsClient = new AWSSQSClient(awsService.getClient());
+ awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
queueUrl = awssqsClient.getQueue(queueName);
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
index 786042b..69d9b46 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
@@ -21,12 +21,12 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQS;
import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<AmazonSQS> service = AWSServiceFactory.createSQSService();
+ public static AWSService service = AWSServiceFactory.createSQSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
@@ -66,7 +66,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awssqsClient = new AWSSQSClient(service.getClient());
+ awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
queueUrl = awssqsClient.getQueue(queueName);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java
deleted file mode 100644
index 5fd3339..0000000
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSDKClientUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.aws.v2.clients;
-
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
-import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
-import software.amazon.awssdk.services.s3.model.S3Object;
-
-public final class AWSSDKClientUtils {
- private AWSSDKClientUtils() {
-
- }
-
- /**
- * Delete an S3 bucket using the provided client. Coming from AWS documentation:
- * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
- *
- * AWS SDK v1 doc for reference:
- * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
- * @param s3Client the AmazonS3 client instance used to delete the bucket
- * @param bucketName a String containing the bucket name
- */
- public static void deleteBucket(S3Client s3Client, String bucketName) {
- // Delete all objects from the bucket. This is sufficient
- // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
- // delete markers for all objects, but doesn't delete the object versions.
- // To delete objects from versioned buckets, delete all of the object versions before deleting
- // the bucket (see below for an example).
- ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
- .bucket(bucketName)
- .build();
-
- ListObjectsV2Response objectListing;
- do {
- objectListing = s3Client.listObjectsV2(listObjectsRequest);
-
- for (S3Object s3Object : objectListing.contents()) {
- s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build());
- }
-
- listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName)
- .continuationToken(objectListing.nextContinuationToken())
- .build();
- } while (objectListing.isTruncated());
-
- s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build());
- }
-
-}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index b5cca54..3c25d1b 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
@@ -60,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<KinesisClient> awsService = AWSServiceFactory.createKinesisService();
+ public static AWSService awsService = AWSServiceFactory.createKinesisService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
@@ -154,7 +155,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
public void setUp() {
streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100);
- kinesisClient = awsService.getClient();
+ kinesisClient = AWSSDKClientUtils.newKinesisClient();
received = 0;
createStream();
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index 43f1222..a02a6bd 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -22,7 +22,6 @@ import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -30,6 +29,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
@@ -43,7 +43,12 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Object;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -52,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<S3Client> service = AWSServiceFactory.createS3Service();
+ public static AWSService service = AWSServiceFactory.createS3Service();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
private S3Client awsS3Client;
@@ -66,9 +71,44 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
return new String[] {"camel-aws2-s3-kafka-connector"};
}
+ /**
+ * Delete an S3 bucket using the provided client. Coming from AWS documentation:
+ * https://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
+ *
+ * AWS SDK v1 doc for reference:
+ * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java
+ * @param s3Client the AmazonS3 client instance used to delete the bucket
+ * @param bucketName a String containing the bucket name
+ */
+ private static void deleteBucket(S3Client s3Client, String bucketName) {
+ // Delete all objects from the bucket. This is sufficient
+ // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts
+ // delete markers for all objects, but doesn't delete the object versions.
+ // To delete objects from versioned buckets, delete all of the object versions before deleting
+ // the bucket (see below for an example).
+ ListObjectsV2Request listObjectsRequest = ListObjectsV2Request.builder()
+ .bucket(bucketName)
+ .build();
+
+ ListObjectsV2Response objectListing;
+ do {
+ objectListing = s3Client.listObjectsV2(listObjectsRequest);
+
+ for (S3Object s3Object : objectListing.contents()) {
+ s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(s3Object.key()).build());
+ }
+
+ listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName)
+ .continuationToken(objectListing.nextContinuationToken())
+ .build();
+ } while (objectListing.isTruncated());
+
+ s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build());
+ }
+
@BeforeEach
public void setUp() {
- awsS3Client = service.getClient();
+ awsS3Client = AWSSDKClientUtils.newS3Client();
received = 0;
bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
@@ -87,7 +127,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
@AfterEach
public void tearDown() {
try {
- AWSSDKClientUtils.deleteBucket(awsS3Client, bucketName);
+ deleteBucket(awsS3Client, bucketName);
} catch (Exception e) {
LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 13daee7..b30deec 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -32,6 +32,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -44,7 +45,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<SqsClient> awsService = AWSServiceFactory.createSQSService();
+ public static AWSService awsService = AWSServiceFactory.createSQSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
@@ -71,7 +71,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awssqsClient = new AWSSQSClient(awsService.getClient());
+ awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
String queueUrl = awssqsClient.getOrCreateQueue(queueName);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
index c2e6539..a3f2111 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
@@ -39,14 +40,13 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sqs.SqsClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
@RegisterExtension
- public static AWSService<SqsClient> service = AWSServiceFactory.createSQSService();
+ public static AWSService service = AWSServiceFactory.createSQSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
@@ -63,7 +63,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- awssqsClient = new AWSSQSClient(service.getClient());
+ awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
// TODO: this is a work-around for CAMEL-15833