You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:43 UTC
[camel-kafka-connector] 03/14: Converted AWS source tests to use
the reusable source base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 0a745841f2d6466be18ce183879478acd6c37916
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 13:50:51 2021 +0100
Converted AWS source tests to use the reusable source base class
Includes:
- AWS v2 Kinesis source
- AWS v2 S3 source
- AWS v2 SQS source
---
.../source/CamelSourceAWSKinesisITCase.java | 54 ++-----
.../kafkaconnector/aws/v2/s3/common/S3Utils.java | 32 ++++
.../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 164 +++++----------------
.../source/CamelSourceAWSS3LargeFilesITCase.java | 143 ++++++++++++++++++
.../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java | 51 +++----
.../common/test/CamelSourceTestSupport.java | 18 ++-
6 files changed, 261 insertions(+), 201 deletions(-)
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index e19d8bd..d9e5ac5 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -20,15 +20,14 @@ package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -36,8 +35,6 @@ import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
@@ -47,16 +44,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+public class CamelSourceAWSKinesisITCase extends CamelSourceTestSupport {
@RegisterExtension
public static AWSService awsService = AWSServiceFactory.createKinesisService();
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
private String streamName;
private KinesisClient kinesisClient;
+ private String topicName;
- private volatile int received;
private final int expect = 10;
@Override
@@ -66,10 +62,10 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
+ topicName = getTopicForTest(this);
streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100);
kinesisClient = AWSSDKClientUtils.newKinesisClient();
- received = 0;
createStream(kinesisClient, streamName);
}
@@ -80,45 +76,28 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
deleteStream(kinesisClient, streamName);
}
- private boolean checkRecord(ConsumerRecord<String, String> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
-
- return true;
- }
-
-
-
- public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+ protected void produceTestData() {
putRecords(kinesisClient, streamName, expect);
- LOG.debug("Initialized the connector and put the data for the test execution");
+ }
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
+
@Test
@Timeout(120)
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withAmazonConfig(awsService.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
.withStreamName(streamName);
- runtTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@@ -126,12 +105,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withAmazonConfig(awsService.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
.withConfiguration(TestKinesisConfiguration.class.getName())
.withStreamName(streamName);
- runtTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@@ -139,13 +118,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withAmazonConfig(awsService.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
.withUrl(streamName)
.buildUrl();
- runtTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
-
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
index 25e0ec7..f1e36df 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
@@ -17,6 +17,8 @@
package org.apache.camel.kafkaconnector.aws.v2.s3.common;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
@@ -27,6 +29,7 @@ import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
public final class S3Utils {
@@ -94,4 +97,33 @@ public final class S3Utils {
s3Client.createBucket(request);
}
+
+ public static File[] getFilesToSend(File dir) throws IOException {
+ File[] files = dir.listFiles(f -> f.getName().endsWith(".test"));
+ if (files == null) {
+ throw new IOException("Either I/O error or the path used is not a directory");
+ }
+
+ if (files.length == 0) {
+ throw new IOException("Not enough files to run the test");
+ }
+
+ return files;
+ }
+
+ public static void sendFilesFromPath(S3Client s3Client, String bucketName, File[] files) {
+ LOG.debug("Putting S3 objects");
+
+ for (File file : files) {
+ LOG.debug("Trying to read file {}", file.getName());
+
+
+ PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+ .bucket(bucketName)
+ .key(file.getName())
+ .build();
+
+ s3Client.putObject(putObjectRequest, file.toPath());
+ }
+ }
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index a1a3e9e..d3efcd6 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -18,23 +18,24 @@
package org.apache.camel.kafkaconnector.aws.v2.s3.source;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -45,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
@@ -54,32 +54,39 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
-
- @FunctionalInterface
- private interface SendFunction {
- void send();
- }
-
+public class CamelSourceAWSS3ITCase extends CamelSourceTestSupport {
@RegisterExtension
public static AWSService service = AWSServiceFactory.createS3Service();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
private S3Client awsS3Client;
private String bucketName;
+ private String topicName;
- private volatile int received;
private int expect;
+ private File[] files;
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-aws2-s3-kafka-connector"};
}
+ @BeforeAll
+ public void setupTestFiles() throws IOException {
+ final URL resourceDir = this.getClass().getResource(".");
+ final File baseTestDir = new File(resourceDir.getFile());
+
+ files = S3Utils.getFilesToSend(baseTestDir);
+
+ expect = files.length;
+ }
+
+
@BeforeEach
public void setUp() {
+ topicName = getTopicForTest(this);
+
awsS3Client = AWSSDKClientUtils.newS3Client();
- received = 0;
bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
try {
@@ -90,8 +97,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
}
}
-
-
@AfterEach
public void tearDown() {
try {
@@ -101,83 +106,30 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
}
}
- private boolean checkRecord(ConsumerRecord<String, String> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
-
- return true;
- }
-
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction)
- throws ExecutionException, InterruptedException {
-
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- sendFunction.send();
-
- LOG.debug("Done putting S3S objects");
-
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
- }
-
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- runTest(connectorPropertyFactory, this::sendFiles);
+ @Override
+ protected void produceTestData() {
+ S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
}
- private void sendFilesFromPath(File path) {
- LOG.debug("Putting S3 objects");
-
- File[] files = path.listFiles();
- if (files == null) {
- fail("Either I/O error or the path used is not a directory");
- }
-
- expect = files.length;
-
- if (files.length == 0) {
- fail("Not enough files to run the test");
- }
-
- for (File file : files) {
- LOG.debug("Trying to read file {}", file.getName());
-
- PutObjectRequest putObjectRequest = PutObjectRequest.builder()
- .bucket(bucketName)
- .key(file.getName())
- .build();
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
- awsS3Client.putObject(putObjectRequest, file.toPath());
- }
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
}
- private void sendFiles() {
- URL resourceDir = this.getClass().getResource(".");
- File baseTestDir = new File(resourceDir.getFile());
-
- sendFilesFromPath(baseTestDir);
- }
@Test
@Timeout(180)
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withConfiguration(TestS3Configuration.class.getName())
.withBucketNameOrArn(bucketName)
.withAmazonConfig(service.getConnectionProperties());
- runTest(connectorPropertyFactory);
-
- assertEquals(expect, received, "Didn't process the expected amount of messages");
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@@ -185,15 +137,13 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withConfiguration(TestS3Configuration.class.getName())
.withMaxMessagesPerPoll(5)
.withBucketNameOrArn(bucketName)
.withAmazonConfig(service.getConnectionProperties());
- runTest(connectorPropertyFactory);
-
- assertEquals(expect, received, "Didn't process the expected amount of messages");
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@@ -201,14 +151,12 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withConfiguration(TestS3Configuration.class.getName())
.withBucketNameOrArn(bucketName)
.withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
- runTest(connectorPropertyFactory);
-
- assertEquals(expect, received, "Didn't process the expected amount of messages");
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@@ -218,7 +166,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withConfiguration(TestS3Configuration.class.getName())
.withUrl(bucketName)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -227,46 +175,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
.buildUrl();
- runTest(connectorPropertyFactory);
-
- assertEquals(expect, received, "Didn't process the expected amount of messages");
- }
-
-
-
- /* To run this test create (large) files in the a test directory
- (ie.: dd if=/dev/random of=large bs=512 count=50000)
-
- Then run it with:
-
- mvn -DskipIntegrationTests=false -Denable.slow.tests=true
- -Daws-service.s3.test.directory=/path/to/manual-s3
- -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify
- */
- @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
- disabledReason = "Manual test that requires the user to provide a directory with files")
- @Test
- @Timeout(value = 60, unit = TimeUnit.MINUTES)
- public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
- ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withConfiguration(TestS3Configuration.class.getName())
- .withBucketNameOrArn(bucketName)
- .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
-
- String filePath = System.getProperty("aws-service.s3.test.directory");
-
- File path = new File(filePath);
-
- runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
-
- String[] files = path.list();
- if (files == null) {
- fail("Either I/O error or the path used is not a directory");
- }
-
- assertEquals(files.length, received, "Didn't process the expected amount of messages");
+ runTest(connectorPropertyFactory, topicName, expect);
}
-
}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
new file mode 100644
index 0000000..6d7df7e
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+/* To run this test create (large) files in the a test directory
+ (ie.: dd if=/dev/random of=large.test bs=512 count=50000)
+
+ Note: they must have the .test extension.
+
+ Then run it with:
+
+ mvn -DskipIntegrationTests=false -Daws-service.s3.test.directory=/path/to/manual-s3
+ -Dit.test=CamelSourceAWSS3LargeFilesITCase verify
+*/
+@EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
+ disabledReason = "Manual test that requires the user to provide a directory with files")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceAWSS3LargeFilesITCase extends CamelSourceTestSupport {
+ @RegisterExtension
+ public static AWSService service = AWSServiceFactory.createS3Service();
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3LargeFilesITCase.class);
+
+ private S3Client awsS3Client;
+ private String bucketName;
+ private String topicName;
+
+ private int expect;
+ private File[] files;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-aws2-s3-kafka-connector"};
+ }
+
+ @BeforeAll
+ public void setupTestFiles() throws IOException {
+ String filePath = System.getProperty("aws-service.s3.test.directory");
+ File baseTestDir = new File(filePath);
+
+ files = S3Utils.getFilesToSend(baseTestDir);
+
+ expect = files.length;
+ }
+
+
+ @BeforeEach
+ public void setUp() {
+ topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ awsS3Client = AWSSDKClientUtils.newS3Client();
+ bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
+
+ try {
+ createBucket(awsS3Client, bucketName);
+ } catch (Exception e) {
+ LOG.error("Unable to create bucket: {}", e.getMessage(), e);
+ fail("Unable to create bucket");
+ }
+ }
+
+ @AfterEach
+ public void tearDown() {
+ try {
+ deleteBucket(awsS3Client, bucketName);
+ } catch (Exception e) {
+ LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected void produceTestData() {
+ S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
+ }
+
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
+
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
+ }
+
+
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.MINUTES)
+ public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withConfiguration(TestS3Configuration.class.getName())
+ .withBucketNameOrArn(bucketName)
+ .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
index e9bdf96..d4b11ac 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
@@ -21,16 +21,15 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.AWSConfigs;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -48,16 +47,15 @@ import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
-
+public class CamelSourceAWSSQSITCase extends CamelSourceTestSupport {
@RegisterExtension
public static AWSService service = AWSServiceFactory.createSQSService();
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
private AWSSQSClient awssqsClient;
private String queueName;
+ private String topicName;
- private volatile int received;
private final int expect = 10;
@Override
@@ -67,12 +65,13 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
+ topicName = getTopicForTest(this);
+
awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
// TODO: this is a work-around for CAMEL-15833
awssqsClient.createQueue(queueName);
- received = 0;
}
@AfterEach
@@ -82,32 +81,18 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
}
}
- private boolean checkRecord(ConsumerRecord<String, String> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
-
- return true;
- }
-
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
+ @Override
+ protected void produceTestData() {
LOG.debug("Sending SQS messages");
for (int i = 0; i < expect; i++) {
awssqsClient.send(queueName, "Source test message " + i);
}
LOG.debug("Done sending SQS messages");
+ }
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
-
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
@@ -116,11 +101,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties());
- runTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
// This test does not run remotely because SQS has a cool down period for
@@ -131,11 +116,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE);
- runTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
// This test does not run remotely because SQS has a cool down period for
@@ -148,7 +133,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUrl(queueName)
.append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
@@ -157,6 +142,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
.append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()))
.buildUrl();
- runTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 7c9ee9b..35626a3 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -56,12 +56,26 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
* @throws Exception For test-specific exceptions
*/
public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+ runTest(connectorPropertyFactory, consumer, this::produceTestData);
+ }
+
+ /**
+ * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @param consumer A Kafka consumer consumer for the test messages
+ * @param producer A producer for the test messages
+ * @throws Exception For test-specific exceptions
+ */
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+ FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
LOG.debug("Initialized the connector and put the data for the test execution");
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+// getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+ getKafkaConnectService().initializeConnector(connectorPropertyFactory);
LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
- produceTestData();
+ producer.produceMessages();
LOG.debug("Creating the Kafka consumer ...");
consumer.consumeMessages();