You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2020/12/16 11:15:01 UTC

[camel-kafka-connector] branch master updated: Added new S3 v2 manual test that supports testing w/ large files

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b047d3  Added new S3 v2 manual test that supports testing w/ large files
1b047d3 is described below

commit 1b047d3cc23c6d2a684cd8fc85a920c5ae0962ce
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Dec 16 10:19:02 2020 +0100

    Added new S3 v2 manual test that supports testing w/ large files
---
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   | 102 +++++++++++++++++----
 1 file changed, 83 insertions(+), 19 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index 6ad128b..5f571b7 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.net.URL;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSDKClientUtils;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -54,6 +55,11 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
 
+    @FunctionalInterface
+    private interface SendFunction {
+        void send();
+    }
+
     @RegisterExtension
     AWSService<S3Client> service = AWSServiceFactory.createS3Service();
 
@@ -61,7 +67,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     private String bucketName;
 
     private volatile int received;
-    private final int expect = 10;
+    private int expect;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -106,35 +112,53 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         return true;
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction)
+            throws ExecutionException, InterruptedException {
+
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        LOG.debug("Putting S3 objects");
-        for (int i = 0; i < expect; i++) {
-            String name = "file" + i + ".test";
-            LOG.debug("Trying to read file {}", name);
-            URL fileResource = this.getClass().getResource(name);
-            LOG.debug("Found file at {}", fileResource.getPath());
-            String file = fileResource.getFile();
-            LOG.debug("Using file {}", file);
-
-            LOG.trace("Putting file {}", file);
-            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
-                    .bucket(bucketName)
-                    .key(name)
-                    .build();
+        sendFunction.send();
 
-            awsS3Client.putObject(putObjectRequest, new File(file).toPath());
-        }
         LOG.debug("Done putting S3S objects");
 
         LOG.debug("Creating the consumer ...");
         KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
         kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
         LOG.debug("Created the consumer ...");
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        runTest(connectorPropertyFactory, this::sendFiles);
+    }
+
+    private void sendFilesFromPath(File path) {
+        LOG.debug("Putting S3 objects");
+
+        File[] files = path.listFiles();
+        expect = files.length;
+
+        if (files.length == 0) {
+            fail("Not enough files to run the test");
+        }
+
+        for (File file : files) {
+            LOG.debug("Trying to read file {}", file.getName());
+
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(bucketName)
+                    .key(file.getName())
+                    .build();
+
+            awsS3Client.putObject(putObjectRequest, file.toPath());
+        }
+    }
+
+    private void sendFiles() {
+        URL resourceDir = this.getClass().getResource(".");
+        File baseTestDir = new File(resourceDir.getFile());
 
-        assertEquals(received, expect, "Didn't process the expected amount of messages");
+        sendFilesFromPath(baseTestDir);
     }
 
     @Test
@@ -148,6 +172,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties());
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
     }
 
     @Test
@@ -162,6 +188,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties());
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
     }
 
     @Test
@@ -175,6 +203,8 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                 .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
     }
 
     @Test
@@ -194,6 +224,40 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                 .buildUrl();
 
         runTest(connectorPropertyFactory);
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+    }
+
+
+
+    /* To run this test create (large) files in the a test directory
+        (ie.: dd if=/dev/random of=large bs=512 count=50000)
+
+        Then run it with:
+
+        mvn -DskipIntegrationTests=false -Denable.slow.tests=true
+            -Daws-service.s3.test.directory=/path/to/manual-s3
+            -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify
+     */
+    @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
+            disabledReason = "Manual test that requires the user to provide a directory with files")
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.MINUTES)
+    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+        String filePath = System.getProperty("aws-service.s3.test.directory");
+
+        File path = new File(filePath);
+
+        runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
+
+        assertEquals(path.list().length, received, "Didn't process the expected amount of messages");
     }
 
 }