You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:43 UTC

[camel-kafka-connector] 03/14: Converted AWS source tests to use the reusable source base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0a745841f2d6466be18ce183879478acd6c37916
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 13:50:51 2021 +0100

    Converted AWS source tests to use the reusable source base class
    
    Includes:
    - AWS v2 Kinesis source
    - AWS v2 S3 source
    - AWS v2 SQS source
---
 .../source/CamelSourceAWSKinesisITCase.java        |  54 ++-----
 .../kafkaconnector/aws/v2/s3/common/S3Utils.java   |  32 ++++
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   | 164 +++++----------------
 .../source/CamelSourceAWSS3LargeFilesITCase.java   | 143 ++++++++++++++++++
 .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java |  51 +++----
 .../common/test/CamelSourceTestSupport.java        |  18 ++-
 6 files changed, 261 insertions(+), 201 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index e19d8bd..d9e5ac5 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -20,15 +20,14 @@ package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -36,8 +35,6 @@ import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 
 import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
@@ -47,16 +44,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+public class CamelSourceAWSKinesisITCase extends CamelSourceTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createKinesisService();
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
 
     private String streamName;
     private KinesisClient kinesisClient;
+    private String topicName;
 
-    private volatile int received;
     private final int expect = 10;
 
     @Override
@@ -66,10 +62,10 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100);
 
         kinesisClient = AWSSDKClientUtils.newKinesisClient();
-        received = 0;
 
         createStream(kinesisClient, streamName);
     }
@@ -80,45 +76,28 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
         deleteStream(kinesisClient, streamName);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-
-
-    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    protected void produceTestData() {
         putRecords(kinesisClient, streamName, expect);
-        LOG.debug("Initialized the connector and put the data for the test execution");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(120)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withStreamName(streamName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -126,12 +105,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withStreamName(streamName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -139,13 +118,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withUrl(streamName)
                 .buildUrl();
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
-
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
index 25e0ec7..f1e36df 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.s3.common;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -27,6 +29,7 @@ import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public final class S3Utils {
@@ -94,4 +97,33 @@ public final class S3Utils {
 
         s3Client.createBucket(request);
     }
+
+    public static File[] getFilesToSend(File dir) throws IOException {
+        File[] files = dir.listFiles(f -> f.getName().endsWith(".test"));
+        if (files == null) {
+            throw new IOException("Either I/O error or the path used is not a directory");
+        }
+
+        if (files.length == 0) {
+            throw new IOException("Not enough files to run the test");
+        }
+
+        return files;
+    }
+
+    public static void sendFilesFromPath(S3Client s3Client, String bucketName, File[] files) {
+        LOG.debug("Putting S3 objects");
+
+        for (File file : files) {
+            LOG.debug("Trying to read file {}", file.getName());
+
+
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(bucketName)
+                    .key(file.getName())
+                    .build();
+
+            s3Client.putObject(putObjectRequest, file.toPath());
+        }
+    }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index a1a3e9e..d3efcd6 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -18,23 +18,24 @@
 package org.apache.camel.kafkaconnector.aws.v2.s3.source;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
 import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -45,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 
 import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
 import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
@@ -54,32 +54,39 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
-
-    @FunctionalInterface
-    private interface SendFunction {
-        void send();
-    }
-
+public class CamelSourceAWSS3ITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createS3Service();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
 
     private S3Client awsS3Client;
     private String bucketName;
+    private String topicName;
 
-    private volatile int received;
     private int expect;
+    private File[] files;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-aws2-s3-kafka-connector"};
     }
 
+    @BeforeAll
+    public void setupTestFiles() throws IOException {
+        final URL resourceDir = this.getClass().getResource(".");
+        final File baseTestDir = new File(resourceDir.getFile());
+
+        files = S3Utils.getFilesToSend(baseTestDir);
+
+        expect = files.length;
+    }
+
+
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         awsS3Client = AWSSDKClientUtils.newS3Client();
-        received = 0;
         bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
 
         try {
@@ -90,8 +97,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         }
     }
 
-
-
     @AfterEach
     public void tearDown() {
         try {
@@ -101,83 +106,30 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction)
-            throws ExecutionException, InterruptedException {
-
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        sendFunction.send();
-
-        LOG.debug("Done putting S3S objects");
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        runTest(connectorPropertyFactory, this::sendFiles);
+    @Override
+    protected void produceTestData() {
+        S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
     }
 
-    private void sendFilesFromPath(File path) {
-        LOG.debug("Putting S3 objects");
-
-        File[] files = path.listFiles();
-        if (files == null) {
-            fail("Either I/O error or the path used is not a directory");
-        }
-
-        expect = files.length;
-
-        if (files.length == 0) {
-            fail("Not enough files to run the test");
-        }
-
-        for (File file : files) {
-            LOG.debug("Trying to read file {}", file.getName());
-
-            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
-                    .bucket(bucketName)
-                    .key(file.getName())
-                    .build();
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-            awsS3Client.putObject(putObjectRequest, file.toPath());
-        }
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
     }
 
-    private void sendFiles() {
-        URL resourceDir = this.getClass().getResource(".");
-        File baseTestDir = new File(resourceDir.getFile());
-
-        sendFilesFromPath(baseTestDir);
-    }
 
     @Test
     @Timeout(180)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -185,15 +137,13 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withMaxMessagesPerPoll(5)
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -201,14 +151,12 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -218,7 +166,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withUrl(bucketName)
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -227,46 +175,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
-    }
-
-
-
-    /* To run this test create (large) files in the a test directory
-        (ie.: dd if=/dev/random of=large bs=512 count=50000)
-
-        Then run it with:
-
-        mvn -DskipIntegrationTests=false -Denable.slow.tests=true
-            -Daws-service.s3.test.directory=/path/to/manual-s3
-            -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify
-     */
-    @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
-            disabledReason = "Manual test that requires the user to provide a directory with files")
-    @Test
-    @Timeout(value = 60, unit = TimeUnit.MINUTES)
-    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
-        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
-                .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withConfiguration(TestS3Configuration.class.getName())
-                .withBucketNameOrArn(bucketName)
-                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
-
-        String filePath = System.getProperty("aws-service.s3.test.directory");
-
-        File path = new File(filePath);
-
-        runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
-
-        String[] files = path.list();
-        if (files == null) {
-            fail("Either I/O error or the path used is not a directory");
-        }
-
-        assertEquals(files.length, received, "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
-
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
new file mode 100644
index 0000000..6d7df7e
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+/* To run this test create (large) files in the a test directory
+   (ie.: dd if=/dev/random of=large.test bs=512 count=50000)
+
+   Note: they must have the .test extension.
+
+   Then run it with:
+
+   mvn -DskipIntegrationTests=false -Daws-service.s3.test.directory=/path/to/manual-s3
+       -Dit.test=CamelSourceAWSS3LargeFilesITCase verify
+*/
+@EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
+        disabledReason = "Manual test that requires the user to provide a directory with files")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceAWSS3LargeFilesITCase extends CamelSourceTestSupport {
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3LargeFilesITCase.class);
+
+    private S3Client awsS3Client;
+    private String bucketName;
+    private String topicName;
+
+    private int expect;
+    private File[] files;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-s3-kafka-connector"};
+    }
+
+    @BeforeAll
+    public void setupTestFiles() throws IOException {
+        String filePath = System.getProperty("aws-service.s3.test.directory");
+        File baseTestDir = new File(filePath);
+
+        files = S3Utils.getFilesToSend(baseTestDir);
+
+        expect = files.length;
+    }
+
+
+    @BeforeEach
+    public void setUp() {
+        topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        awsS3Client = AWSSDKClientUtils.newS3Client();
+        bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
+
+        try {
+            createBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.error("Unable to create bucket: {}", e.getMessage(), e);
+            fail("Unable to create bucket");
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+        try {
+            deleteBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void produceTestData() {
+        S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+    }
+
+
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.MINUTES)
+    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
index e9bdf96..d4b11ac 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
@@ -21,16 +21,15 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,16 +47,15 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
-
+public class CamelSourceAWSSQSITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createSQSService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
 
     private AWSSQSClient awssqsClient;
     private String queueName;
+    private String topicName;
 
-    private volatile int received;
     private final int expect = 10;
 
     @Override
@@ -67,12 +65,13 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
         queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
 
         // TODO: this is a work-around for CAMEL-15833
         awssqsClient.createQueue(queueName);
-        received = 0;
     }
 
     @AfterEach
@@ -82,32 +81,18 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
+    @Override
+    protected void produceTestData() {
         LOG.debug("Sending SQS messages");
         for (int i = 0; i < expect; i++) {
             awssqsClient.send(queueName, "Source test message " + i);
         }
         LOG.debug("Done sending SQS messages");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
@@ -116,11 +101,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withQueueOrArn(queueName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     // This test does not run remotely because SQS has a cool down period for
@@ -131,11 +116,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withQueueOrArn(queueName)
                 .withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     // This test does not run remotely because SQS has a cool down period for
@@ -148,7 +133,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl(queueName)
                 .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                 .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
@@ -157,6 +142,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
                 .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()))
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 7c9ee9b..35626a3 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -56,12 +56,26 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
      * @throws Exception For test-specific exceptions
      */
     public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+        runTest(connectorPropertyFactory, consumer, this::produceTestData);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @param producer A producer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+                        FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         LOG.debug("Initialized the connector and put the data for the test execution");
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+//        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
-        produceTestData();
+        producer.produceMessages();
 
         LOG.debug("Creating the Kafka consumer ...");
         consumer.consumeMessages();