You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2020/12/16 11:15:01 UTC
[camel-kafka-connector] branch master updated: Added new S3 v2
manual test that supports testing w/ large files
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1b047d3 Added new S3 v2 manual test that supports testing w/ large files
1b047d3 is described below
commit 1b047d3cc23c6d2a684cd8fc85a920c5ae0962ce
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Dec 16 10:19:02 2020 +0100
Added new S3 v2 manual test that supports testing w/ large files
---
.../aws/v2/s3/source/CamelSourceAWSS3ITCase.java | 102 +++++++++++++++++----
1 file changed, 83 insertions(+), 19 deletions(-)
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index 6ad128b..5f571b7 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -54,6 +55,11 @@ import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
+ @FunctionalInterface
+ private interface SendFunction {
+ void send();
+ }
+
@RegisterExtension
AWSService<S3Client> service = AWSServiceFactory.createS3Service();
@@ -61,7 +67,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
private String bucketName;
private volatile int received;
- private final int expect = 10;
+ private int expect;
@Override
protected String[] getConnectorsInTest() {
@@ -106,35 +112,53 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
return true;
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction)
+ throws ExecutionException, InterruptedException {
+
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
- LOG.debug("Putting S3 objects");
- for (int i = 0; i < expect; i++) {
- String name = "file" + i + ".test";
- LOG.debug("Trying to read file {}", name);
- URL fileResource = this.getClass().getResource(name);
- LOG.debug("Found file at {}", fileResource.getPath());
- String file = fileResource.getFile();
- LOG.debug("Using file {}", file);
-
- LOG.trace("Putting file {}", file);
- PutObjectRequest putObjectRequest = PutObjectRequest.builder()
- .bucket(bucketName)
- .key(name)
- .build();
+ sendFunction.send();
- awsS3Client.putObject(putObjectRequest, new File(file).toPath());
- }
LOG.debug("Done putting S3S objects");
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
LOG.debug("Created the consumer ...");
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+ runTest(connectorPropertyFactory, this::sendFiles);
+ }
+
+ private void sendFilesFromPath(File path) {
+ LOG.debug("Putting S3 objects");
+
+ File[] files = path.listFiles();
+ expect = files.length;
+
+ if (files.length == 0) {
+ fail("Not enough files to run the test");
+ }
+
+ for (File file : files) {
+ LOG.debug("Trying to read file {}", file.getName());
+
+ PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+ .bucket(bucketName)
+ .key(file.getName())
+ .build();
+
+ awsS3Client.putObject(putObjectRequest, file.toPath());
+ }
+ }
+
+ private void sendFiles() {
+ URL resourceDir = this.getClass().getResource(".");
+ File baseTestDir = new File(resourceDir.getFile());
- assertEquals(received, expect, "Didn't process the expected amount of messages");
+ sendFilesFromPath(baseTestDir);
}
@Test
@@ -148,6 +172,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
.withAmazonConfig(service.getConnectionProperties());
runTest(connectorPropertyFactory);
+
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
}
@Test
@@ -162,6 +188,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
.withAmazonConfig(service.getConnectionProperties());
runTest(connectorPropertyFactory);
+
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
}
@Test
@@ -175,6 +203,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
.withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
runTest(connectorPropertyFactory);
+
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
}
@Test
@@ -194,6 +224,40 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
.buildUrl();
runTest(connectorPropertyFactory);
+
+ assertEquals(expect, received, "Didn't process the expected amount of messages");
+ }
+
+
+
+ /* To run this test create (large) files in the a test directory
+ (ie.: dd if=/dev/random of=large bs=512 count=50000)
+
+ Then run it with:
+
+ mvn -DskipIntegrationTests=false -Denable.slow.tests=true
+ -Daws-service.s3.test.directory=/path/to/manual-s3
+ -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify
+ */
+ @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
+ disabledReason = "Manual test that requires the user to provide a directory with files")
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.MINUTES)
+ public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+ .basic()
+ .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withConfiguration(TestS3Configuration.class.getName())
+ .withBucketNameOrArn(bucketName)
+ .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+ String filePath = System.getProperty("aws-service.s3.test.directory");
+
+ File path = new File(filePath);
+
+ runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
+
+ assertEquals(path.list().length, received, "Didn't process the expected amount of messages");
}
}