You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:40 UTC

[camel-kafka-connector] branch camel-master updated (00d7241 -> bc99e54)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 00d7241  Added camel-paho-mqtt5 to generated connectors json
     new b276cf2  Added missing slow tests filter to avoid trying to run the AWS v2 test on GH actions
     new fc34cba  Added a reusable base class for the source tests
     new 0a74584  Converted AWS source tests to use the reusable source base class
     new 0fa2149  Converted the Azure storage queue source test case to use the reusable source base class
     new fe801e0  Converted the Cassandra source test case to use the reusable source base class
     new e6e7f97  Converted the MongoDB source test case to use the reusable source base class
     new b55096e  Converted the RabbitMQ source test case to use the reusable source base class
     new e95b9fb  Converted the SJMS2 source test cases to use the reusable source base class
     new f2f51b1  Converted the SQL source test case to use the reusable source base class
     new 550d1a2  Converted the SSH source test case to use the reusable source base class
     new de00cf1  Converted the Syslog source test case to use the reusable source base class
     new ba38537  Converted the Timer source test case to use the reusable source base class
     new 2d018cb  Disabled Azure storage blob tests due to GH issue #997
     new bc99e54  Disabled Azure storage queue sink tests due to GH issue #976

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java     |   2 +
 .../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java     |   2 +
 .../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java |   1 -
 .../source/CamelSourceAWSKinesisITCase.java        |  54 ++-----
 .../kafkaconnector/aws/v2/s3/common/S3Utils.java   |  32 ++++
 .../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java       |   2 +
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   | 164 +++++----------------
 .../CamelSourceAWSS3LargeFilesITCase.java}         | 135 ++++++++---------
 .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java |  51 +++----
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java |   2 +
 .../sink/CamelSinkAzureStorageQueueITCase.java     |   2 +
 .../source/CamelSourceAzureStorageQueueITCase.java |  57 +++----
 .../source/CamelSourceCassandraITCase.java         |  60 +++-----
 ...ducer.java => AbstractTestMessageConsumer.java} |  55 ++++---
 .../common/test/CamelSourceTestSupport.java        | 126 ++++++++++++++++
 ...geProducer.java => IntegerMessageConsumer.java} |  13 +-
 ...ageProducer.java => StringMessageConsumer.java} |  13 +-
 ...ssageProducer.java => TestMessageConsumer.java} |  10 +-
 .../mongodb/source/CamelSourceMongoDBITCase.java   |  49 +++---
 .../rabbitmq/source/RabbitMQSourceITCase.java      |  64 ++++----
 .../sjms2/source/CamelSourceJMSITCase.java         | 127 ++++++----------
 .../source/CamelSourceJMSWithAggregation.java      | 110 ++++++++------
 .../sql/source/CamelSourceSQLITCase.java           |  47 +++---
 .../ssh/sink/CamelSinkSshITCase.java               |   3 +-
 .../ssh/source/CamelSourceSshITCase.java           |  47 ++----
 .../syslog/source/CamelSourceSyslogITCase.java     | 124 ++++++++--------
 .../timer/source/CamelSourceTimerITCase.java       |  53 +++----
 27 files changed, 656 insertions(+), 749 deletions(-)
 copy tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/{sink/CamelSinkAWSS3ITCase.java => source/CamelSourceAWSS3LargeFilesITCase.java} (52%)
 copy tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/{AbstractTestMessageProducer.java => AbstractTestMessageConsumer.java} (54%)
 create mode 100644 tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
 copy tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/{TestMessageProducer.java => IntegerMessageConsumer.java} (68%)
 copy tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/{TestMessageProducer.java => StringMessageConsumer.java} (68%)
 copy tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/{FunctionalTestMessageProducer.java => TestMessageConsumer.java} (80%)


[camel-kafka-connector] 07/14: Converted the RabbitMQ source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b55096ed893a0d74ec32821b510df2bbd71c68b0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:03:43 2021 +0100

    Converted the RabbitMQ source test case to use the reusable source base class
---
 .../rabbitmq/source/RabbitMQSourceITCase.java      | 64 +++++++++-------------
 1 file changed, 25 insertions(+), 39 deletions(-)

diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
index 15950e7..4ef2ae2 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
@@ -18,14 +18,12 @@ package org.apache.camel.kafkaconnector.rabbitmq.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class RabbitMQSourceITCase extends AbstractKafkaTest {
+public class RabbitMQSourceITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
 
@@ -45,7 +43,7 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
     private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
 
     private RabbitMQClient rabbitMQClient;
-    private int received;
+    private String topicName;
     private final int expect = 10;
 
     @Override
@@ -55,55 +53,43 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
         rabbitMQClient =  new RabbitMQClient(rabbitmqService.getAmqpUrl());
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
 
-        return true;
-    }
-
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
         rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+    }
 
+    @Override
+    protected void produceTestData() {
         for (int i = 0; i < expect; i++) {
             rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message");
         }
+    }
 
-        LOG.debug("Creating the kafka consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the kafka consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(90)
     public void testSource() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl("")
-                .append("username", rabbitmqService.connectionProperties().username())
-                .append("password", rabbitmqService.connectionProperties().password())
-                .append("autoDelete", "false")
-                .append("queue", DEFAULT_RABBITMQ_QUEUE)
-                .append("skipExchangeDeclare", "true")
-                .append("skipQueueBind", "true")
-                .append("hostname", rabbitmqService.connectionProperties().hostname())
-                .append("portNumber", rabbitmqService.connectionProperties().port())
-                .buildUrl();
-
-        runBasicStringTest(factory);
+                    .append("username", rabbitmqService.connectionProperties().username())
+                    .append("password", rabbitmqService.connectionProperties().password())
+                    .append("autoDelete", "false")
+                    .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                    .append("skipExchangeDeclare", "true")
+                    .append("skipQueueBind", "true")
+                    .append("hostname", rabbitmqService.connectionProperties().hostname())
+                    .append("portNumber", rabbitmqService.connectionProperties().port())
+                    .buildUrl();
+
+        runTest(factory, topicName, expect);
     }
 }


[camel-kafka-connector] 06/14: Converted the MongoDB source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e6e7f97b852a81764ce71f359d19bb5f39a24fa0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:00:03 2021 +0100

    Converted the MongoDB source test case to use the reusable source base class
---
 .../mongodb/source/CamelSourceMongoDBITCase.java   | 49 +++++++++-------------
 1 file changed, 19 insertions(+), 30 deletions(-)

diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
index 9260f05..5c4fa5d 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
@@ -26,46 +26,41 @@ import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.CreateCollectionOptions;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.test.infra.mongodb.services.MongoDBService;
 import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.bson.Document;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Disabled(value = "Disabled due to issue #974")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
+public class CamelSourceMongoDBITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static MongoDBService mongoDBService = MongoDBServiceFactory.createService();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceMongoDBITCase.class);
-
     private MongoClient mongoClient;
+    private String topicName;
 
     private final int expect = 10;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[]{"camel-mongodb-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
+    @BeforeAll
+    public void setUpDb() {
         mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl());
 
         MongoDatabase database = mongoClient.getDatabase("testDatabase");
@@ -95,25 +90,19 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
         collection.insertMany(documents);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+    @Override
+    protected void produceTestData() {
+        // NO-OP: static data already produced on the DB setup method
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
@@ -125,13 +114,13 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
                 mongoDBService.getReplicaSetUrl());
 
         ConnectorPropertyFactory factory = CamelMongoDBPropertyFactory.basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConnectionBean("mongo",
                         BasicConnectorPropertyFactory.classRef(connectionBeanRef))
                 .withDatabase("testDatabase")
                 .withCollection("testCollection")
                 .withCreateCollection(true);
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 }


[camel-kafka-connector] 14/14: Disabled Azure storage queue sink tests due to GH issue #976

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit bc99e5443ae3d7eccfdfc504940417114f1f440e
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Tue Feb 9 21:41:16 2021 +0100

    Disabled Azure storage queue sink tests due to GH issue #976
---
 .../azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java      | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index ef12c18..1d5eb05 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -33,6 +33,7 @@ import org.apache.camel.test.infra.azure.common.services.AzureService;
 import org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@Disabled(value = "Disabled due to issue #976")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
     @RegisterExtension


[camel-kafka-connector] 12/14: Converted the Timer source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ba38537531ac192a13d05a2c7369049a0e21e2ae
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:54:30 2021 +0100

    Converted the Timer source test case to use the reusable source base class
---
 .../timer/source/CamelSourceTimerITCase.java       | 53 ++++++++--------------
 1 file changed, 18 insertions(+), 35 deletions(-)

diff --git a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
index bc02984..cedb12d 100644
--- a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
+++ b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
@@ -19,17 +19,12 @@ package org.apache.camel.kafkaconnector.timer.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -38,11 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceTimerITCase extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTimerITCase.class);
-
-    private int received;
+public class CamelSourceTimerITCase extends CamelSourceTestSupport {
     private final int expect = 10;
+    private String topicName;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -51,53 +44,43 @@ public class CamelSourceTimerITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @Override
+    protected void produceTestData() {
+        // NO-OP
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-        assertEquals(received, expect);
+        assertEquals(expect, received, "Did not receive as many messages as expected");
     }
 
     @Test
-    @Timeout(90)
+    @Timeout(30)
     public void testLaunchConnector() throws ExecutionException, InterruptedException {
         CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withTimerName("launchTest")
                 .withRepeatCount(expect);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
-    @Timeout(90)
+    @Timeout(30)
     public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException {
         CamelTimerPropertyFactory connectorPropertyFactory = CamelTimerPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl("launchTestUsingUrl")
                     .append("repeatCount", expect)
                     .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 03/14: Converted AWS source tests to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0a745841f2d6466be18ce183879478acd6c37916
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 13:50:51 2021 +0100

    Converted AWS source tests to use the reusable source base class
    
    Includes:
    - AWS v2 Kinesis source
    - AWS v2 S3 source
    - AWS v2 SQS source
---
 .../source/CamelSourceAWSKinesisITCase.java        |  54 ++-----
 .../kafkaconnector/aws/v2/s3/common/S3Utils.java   |  32 ++++
 .../aws/v2/s3/source/CamelSourceAWSS3ITCase.java   | 164 +++++----------------
 .../source/CamelSourceAWSS3LargeFilesITCase.java   | 143 ++++++++++++++++++
 .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java |  51 +++----
 .../common/test/CamelSourceTestSupport.java        |  18 ++-
 6 files changed, 261 insertions(+), 201 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index e19d8bd..d9e5ac5 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -20,15 +20,14 @@ package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -36,8 +35,6 @@ import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 
 import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
@@ -47,16 +44,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+public class CamelSourceAWSKinesisITCase extends CamelSourceTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createKinesisService();
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
 
     private String streamName;
     private KinesisClient kinesisClient;
+    private String topicName;
 
-    private volatile int received;
     private final int expect = 10;
 
     @Override
@@ -66,10 +62,10 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100);
 
         kinesisClient = AWSSDKClientUtils.newKinesisClient();
-        received = 0;
 
         createStream(kinesisClient, streamName);
     }
@@ -80,45 +76,28 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
         deleteStream(kinesisClient, streamName);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-
-
-    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    protected void produceTestData() {
         putRecords(kinesisClient, streamName, expect);
-        LOG.debug("Initialized the connector and put the data for the test execution");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(120)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withStreamName(streamName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -126,12 +105,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withStreamName(streamName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -139,13 +118,12 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAmazonConfig(awsService.getConnectionProperties())
                 .withConfiguration(TestKinesisConfiguration.class.getName())
                 .withUrl(streamName)
                 .buildUrl();
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
-
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
index 25e0ec7..f1e36df 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/common/S3Utils.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.s3.common;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -27,6 +29,7 @@ import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public final class S3Utils {
@@ -94,4 +97,33 @@ public final class S3Utils {
 
         s3Client.createBucket(request);
     }
+
+    public static File[] getFilesToSend(File dir) throws IOException {
+        File[] files = dir.listFiles(f -> f.getName().endsWith(".test"));
+        if (files == null) {
+            throw new IOException("Either I/O error or the path used is not a directory");
+        }
+
+        if (files.length == 0) {
+            throw new IOException("Not enough files to run the test");
+        }
+
+        return files;
+    }
+
+    public static void sendFilesFromPath(S3Client s3Client, String bucketName, File[] files) {
+        LOG.debug("Putting S3 objects");
+
+        for (File file : files) {
+            LOG.debug("Trying to read file {}", file.getName());
+
+
+            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+                    .bucket(bucketName)
+                    .key(file.getName())
+                    .build();
+
+            s3Client.putObject(putObjectRequest, file.toPath());
+        }
+    }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
index a1a3e9e..d3efcd6 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3ITCase.java
@@ -18,23 +18,24 @@
 package org.apache.camel.kafkaconnector.aws.v2.s3.source;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
 import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -45,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 
 import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
 import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
@@ -54,32 +54,39 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
-
-    @FunctionalInterface
-    private interface SendFunction {
-        void send();
-    }
-
+public class CamelSourceAWSS3ITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createS3Service();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
 
     private S3Client awsS3Client;
     private String bucketName;
+    private String topicName;
 
-    private volatile int received;
     private int expect;
+    private File[] files;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-aws2-s3-kafka-connector"};
     }
 
+    @BeforeAll
+    public void setupTestFiles() throws IOException {
+        final URL resourceDir = this.getClass().getResource(".");
+        final File baseTestDir = new File(resourceDir.getFile());
+
+        files = S3Utils.getFilesToSend(baseTestDir);
+
+        expect = files.length;
+    }
+
+
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         awsS3Client = AWSSDKClientUtils.newS3Client();
-        received = 0;
         bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
 
         try {
@@ -90,8 +97,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         }
     }
 
-
-
     @AfterEach
     public void tearDown() {
         try {
@@ -101,83 +106,30 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, SendFunction sendFunction)
-            throws ExecutionException, InterruptedException {
-
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        sendFunction.send();
-
-        LOG.debug("Done putting S3S objects");
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        runTest(connectorPropertyFactory, this::sendFiles);
+    @Override
+    protected void produceTestData() {
+        S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
     }
 
-    private void sendFilesFromPath(File path) {
-        LOG.debug("Putting S3 objects");
-
-        File[] files = path.listFiles();
-        if (files == null) {
-            fail("Either I/O error or the path used is not a directory");
-        }
-
-        expect = files.length;
-
-        if (files.length == 0) {
-            fail("Not enough files to run the test");
-        }
-
-        for (File file : files) {
-            LOG.debug("Trying to read file {}", file.getName());
-
-            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
-                    .bucket(bucketName)
-                    .key(file.getName())
-                    .build();
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-            awsS3Client.putObject(putObjectRequest, file.toPath());
-        }
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
     }
 
-    private void sendFiles() {
-        URL resourceDir = this.getClass().getResource(".");
-        File baseTestDir = new File(resourceDir.getFile());
-
-        sendFilesFromPath(baseTestDir);
-    }
 
     @Test
     @Timeout(180)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -185,15 +137,13 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withMaxMessagesPerPoll(5)
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -201,14 +151,12 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withBucketNameOrArn(bucketName)
                 .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
@@ -218,7 +166,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConfiguration(TestS3Configuration.class.getName())
                 .withUrl(bucketName)
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
@@ -227,46 +175,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.id()))
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
-
-        assertEquals(expect, received,  "Didn't process the expected amount of messages");
-    }
-
-
-
-    /* To run this test create (large) files in the a test directory
-        (ie.: dd if=/dev/random of=large bs=512 count=50000)
-
-        Then run it with:
-
-        mvn -DskipIntegrationTests=false -Denable.slow.tests=true
-            -Daws-service.s3.test.directory=/path/to/manual-s3
-            -Dit.test=CamelSourceAWSS3ITCase#testBasicSendReceiveWithKafkaStyleLargeFile verify
-     */
-    @EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
-            disabledReason = "Manual test that requires the user to provide a directory with files")
-    @Test
-    @Timeout(value = 60, unit = TimeUnit.MINUTES)
-    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
-        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
-                .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withConfiguration(TestS3Configuration.class.getName())
-                .withBucketNameOrArn(bucketName)
-                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
-
-        String filePath = System.getProperty("aws-service.s3.test.directory");
-
-        File path = new File(filePath);
-
-        runTest(connectorPropertyFactory, () -> sendFilesFromPath(path));
-
-        String[] files = path.list();
-        if (files == null) {
-            fail("Either I/O error or the path used is not a directory");
-        }
-
-        assertEquals(files.length, received, "Didn't process the expected amount of messages");
+        runTest(connectorPropertyFactory, topicName, expect);
     }
-
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
new file mode 100644
index 0000000..6d7df7e
--- /dev/null
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/source/CamelSourceAWSS3LargeFilesITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.s3.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
+import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.createBucket;
+import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBucket;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+/* To run this test create (large) files in the a test directory
+   (ie.: dd if=/dev/random of=large.test bs=512 count=50000)
+
+   Note: they must have the .test extension.
+
+   Then run it with:
+
+   mvn -DskipIntegrationTests=false -Daws-service.s3.test.directory=/path/to/manual-s3
+       -Dit.test=CamelSourceAWSS3LargeFilesITCase verify
+*/
+@EnabledIfSystemProperty(named = "aws-service.s3.test.directory", matches = ".*",
+        disabledReason = "Manual test that requires the user to provide a directory with files")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CamelSourceAWSS3LargeFilesITCase extends CamelSourceTestSupport {
+    @RegisterExtension
+    public static AWSService service = AWSServiceFactory.createS3Service();
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3LargeFilesITCase.class);
+
+    private S3Client awsS3Client;
+    private String bucketName;
+    private String topicName;
+
+    private int expect;
+    private File[] files;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-s3-kafka-connector"};
+    }
+
+    @BeforeAll
+    public void setupTestFiles() throws IOException {
+        String filePath = System.getProperty("aws-service.s3.test.directory");
+        File baseTestDir = new File(filePath);
+
+        files = S3Utils.getFilesToSend(baseTestDir);
+
+        expect = files.length;
+    }
+
+
+    @BeforeEach
+    public void setUp() {
+        topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        awsS3Client = AWSSDKClientUtils.newS3Client();
+        bucketName = AWSCommon.DEFAULT_S3_BUCKET + TestUtils.randomWithRange(0, 100);
+
+        try {
+            createBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.error("Unable to create bucket: {}", e.getMessage(), e);
+            fail("Unable to create bucket");
+        }
+    }
+
+    @AfterEach
+    public void tearDown() {
+        try {
+            deleteBucket(awsS3Client, bucketName);
+        } catch (Exception e) {
+            LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void produceTestData() {
+        S3Utils.sendFilesFromPath(awsS3Client, bucketName, files);
+    }
+
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
+
+        assertEquals(expect, received,  "Didn't process the expected amount of messages");
+    }
+
+
+    @Test
+    @Timeout(value = 60, unit = TimeUnit.MINUTES)
+    public void testBasicSendReceiveWithKafkaStyleLargeFile() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withConfiguration(TestS3Configuration.class.getName())
+                .withBucketNameOrArn(bucketName)
+                .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
index e9bdf96..d4b11ac 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java
@@ -21,16 +21,15 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,16 +47,15 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
-
+public class CamelSourceAWSSQSITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createSQSService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
 
     private AWSSQSClient awssqsClient;
     private String queueName;
+    private String topicName;
 
-    private volatile int received;
     private final int expect = 10;
 
     @Override
@@ -67,12 +65,13 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         awssqsClient = new AWSSQSClient(AWSSDKClientUtils.newSQSClient());
         queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
 
         // TODO: this is a work-around for CAMEL-15833
         awssqsClient.createQueue(queueName);
-        received = 0;
     }
 
     @AfterEach
@@ -82,32 +81,18 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
+    @Override
+    protected void produceTestData() {
         LOG.debug("Sending SQS messages");
         for (int i = 0; i < expect; i++) {
             awssqsClient.send(queueName, "Source test message " + i);
         }
         LOG.debug("Done sending SQS messages");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
@@ -116,11 +101,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withQueueOrArn(queueName)
                 .withAmazonConfig(service.getConnectionProperties());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     // This test does not run remotely because SQS has a cool down period for
@@ -131,11 +116,11 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
     public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withQueueOrArn(queueName)
                 .withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     // This test does not run remotely because SQS has a cool down period for
@@ -148,7 +133,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl(queueName)
                 .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                 .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
@@ -157,6 +142,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
                 .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Region.US_EAST_1.toString()))
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 7c9ee9b..35626a3 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -56,12 +56,26 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
      * @throws Exception For test-specific exceptions
      */
     public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+        runTest(connectorPropertyFactory, consumer, this::produceTestData);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @param producer A producer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+                        FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         LOG.debug("Initialized the connector and put the data for the test execution");
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+//        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
-        produceTestData();
+        producer.produceMessages();
 
         LOG.debug("Creating the Kafka consumer ...");
         consumer.consumeMessages();


[camel-kafka-connector] 01/14: Added missing slow tests filter to avoid trying to run the AWS v2 test on GH actions

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b276cf207e09ef2a7875ed0eb3e2e1a37069f733
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 11:22:22 2021 +0100

    Added missing slow tests filter to avoid trying to run the AWS v2 test on GH actions
---
 .../camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java     | 2 ++
 .../camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java     | 2 ++
 .../kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java   | 1 -
 .../camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java       | 2 ++
 4 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index 0c64bcf..7396cd2 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -36,6 +36,7 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ import software.amazon.awssdk.services.ec2.model.InstanceStatus;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
 public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createEC2Service();
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index f88e078..3c696b5 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -36,6 +36,7 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ import software.amazon.awssdk.services.iam.model.User;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
 public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createIAMService();
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index 9ca84ef..ff7ccfc 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -111,7 +111,6 @@ public class CamelSinkAWSKinesisITCase  extends CamelSinkTestSupport {
     }
 
 
-
     @Override
     protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (latch.await(110, TimeUnit.SECONDS)) {
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index 1311507..fe404c4 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -38,6 +38,7 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +50,7 @@ import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBuc
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
 public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createS3Service();


[camel-kafka-connector] 08/14: Converted the SJMS2 source test cases to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e95b9fb3bd2f1874625fa81587a0400b52ce505f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 21:21:28 2021 +0100

    Converted the SJMS2 source test cases to use the reusable source base class
---
 .../common/test/AbstractTestMessageConsumer.java   |   8 +-
 .../common/test/CamelSourceTestSupport.java        |  45 +++++++-
 .../common/test/IntegerMessageConsumer.java        |  29 +++++
 .../sjms2/source/CamelSourceJMSITCase.java         | 127 ++++++++-------------
 .../source/CamelSourceJMSWithAggregation.java      | 110 +++++++++++-------
 5 files changed, 186 insertions(+), 133 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
index 2fcf42f..744abbf 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class);
 
-    private final KafkaClient<String, T> kafkaClient;
-    private final String topicName;
+    protected final KafkaClient<String, T> kafkaClient;
+    protected final String topicName;
     private final int count;
     private final List<ConsumerRecord<String, T>> receivedMessages;
     private volatile int received;
@@ -42,7 +42,7 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
         receivedMessages = new ArrayList<>(count);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, T> record) {
+    public boolean checkRecord(ConsumerRecord<String, T> record) {
         LOG.debug("Received: {}", record.value());
         received++;
         receivedMessages.add(record);
@@ -63,4 +63,6 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
     public List<ConsumerRecord<String, T>> consumedMessages() {
         return receivedMessages;
     }
+
+
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 35626a3..7f8b03c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -33,7 +33,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     protected abstract void verifyMessages(TestMessageConsumer<?> consumer);
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param topic the topic to send the messages to
@@ -49,7 +49,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
 
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
@@ -60,7 +60,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     }
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
@@ -71,7 +71,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
                         FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         LOG.debug("Initialized the connector and put the data for the test execution");
-//        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
@@ -86,4 +85,42 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
         LOG.debug("Verified messages");
     }
 
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+        runTestBlocking(connectorPropertyFactory, consumer, this::produceTestData);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @param producer A producer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+                        FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        LOG.debug("Initialized the connector and put the data for the test execution");
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
+        producer.produceMessages();
+
+        LOG.debug("Creating the Kafka consumer ...");
+        consumer.consumeMessages();
+        LOG.debug("Ran the Kafka consumer ...");
+
+        LOG.debug("Verifying messages");
+        verifyMessages(consumer);
+        LOG.debug("Verified messages");
+    }
+
+
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
new file mode 100644
index 0000000..a49e00e
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A consumer that receives the 'count' amount of text messages from the Kafka broker
+ */
+public class IntegerMessageConsumer extends AbstractTestMessageConsumer<Integer> {
+    public IntegerMessageConsumer(KafkaClient<String, Integer> kafkaClient, String topicName, int count) {
+        super(kafkaClient, topicName, count);
+    }
+}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 5729c15..781e029 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -20,26 +20,24 @@ package org.apache.camel.kafkaconnector.sjms2.source;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
 import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
 
 
 /**
@@ -47,16 +45,14 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSITCase extends AbstractKafkaTest {
+public class CamelSourceJMSITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
             .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
             .build();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
-    private int received;
+    private String topicName;
     private final int expect = 10;
     private JMSClient jmsClient;
 
@@ -76,101 +72,68 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
-        jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
+        topicName = getTopicForTest(this);
 
-        return true;
     }
 
+    @BeforeAll
+    public void setupClient() {
+        jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
+    }
 
-
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    @Override
+    protected void produceTestData() {
         JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
-                    .withConnectionProperties(connectionProperties());
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                .withConnectionProperties(connectionProperties());
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withConnectionProperties(connectionProperties())
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
-                        .buildUrl();
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withConnectionProperties(connectionProperties())
+                .withKafkaTopic(topicName)
+                .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 
     @Test
     @Timeout(90)
-    public void testIntSendReceive() {
-        try {
-            final String jmsQueueName = "testIntSendReceive";
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()) + jmsQueueName)
-                    .withDestinationName(jmsQueueName)
-                    .withConnectionProperties(connectionProperties());
-
-            connectorPropertyFactory.log();
-            getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-            JMSClient.produceMessages(jmsClient, jmsQueueName, expect);
+    public void testIntSendReceive() throws ExecutionException, InterruptedException {
+        final String jmsQueueName = "testIntSendReceive";
 
-            LOG.debug("Creating the consumer ...");
-            KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-            kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()) + "testIntSendReceive", this::checkRecord);
-            LOG.debug("Created the consumer ...");
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(jmsQueueName)
+                .withConnectionProperties(connectionProperties());
 
-            assertEquals(received, expect, "Didn't process the expected amount of messages");
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        IntegerMessageConsumer consumer = new IntegerMessageConsumer(kafkaClient, topicName, expect);
 
+        runTest(connectorPropertyFactory, consumer, () -> JMSClient.produceMessages(jmsClient, jmsQueueName, expect));
     }
 
 
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 0603a78..6b95304 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -20,44 +20,65 @@ package org.apache.camel.kafkaconnector.sjms2.source;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
 import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
+public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
             .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
             .build();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
-    private int received;
     private final int sentSize = 10;
     private final int expect = 1;
     private JMSClient jmsClient;
-    private String receivedMessage = "";
     private String expectedMessage = "";
     private String queueName;
+    private String topicName;
+
+    class GreedyConsumer extends StringMessageConsumer {
+
+        public GreedyConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+            super(kafkaClient, topicName, count);
+        }
+
+        @Override
+        public void consumeMessages() {
+            int retries = 10;
+
+            do {
+                kafkaClient.consumeAvailable(super.topicName, super::checkRecord);
+                if (consumedMessages().size() == 0) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+
+            } while (consumedMessages().size() == 0);
+        }
+    }
 
     private Properties connectionProperties() {
         Properties properties = new Properties();
@@ -73,9 +94,8 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
         return new String[] {"camel-sjms2-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
+    @BeforeAll
+    public void setupClient() {
         jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
 
         for (int i = 0; i < sentSize - 1; i++) {
@@ -83,53 +103,55 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
         }
 
         expectedMessage += "hello;";
-        queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
     }
 
-    private void checkRecord(ConsumerRecord<String, String> record) {
-        receivedMessage += record.value();
-        LOG.debug("Received: {}", receivedMessage);
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
 
-        received++;
+        queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
     }
 
-    private static String textToSend(Integer i) {
-        return "hello;";
+    @Override
+    protected void produceTestData() {
+        JMSClient.produceMessages(jmsClient, queueName, sentSize,
+                CamelSourceJMSWithAggregation::textToSend);
     }
 
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        JMSClient.produceMessages(jmsClient, queueName, sentSize,
-                CamelSourceJMSWithAggregation::textToSend);
+        Object receivedObject = consumer.consumedMessages().get(0).value();
+        if (!(receivedObject instanceof String)) {
+            fail("Unexpected message type");
+        }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+        String receivedMessage = (String) receivedObject;
 
         assertEquals(expect, received, "Didn't process the expected amount of messages");
         assertEquals(expectedMessage, receivedMessage, "The messages don't match");
     }
 
+
+    private static String textToSend(Integer i) {
+        return "hello;";
+    }
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDestinationName(queueName)
-                    .withConnectionProperties(connectionProperties())
-                    .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
-                            1000);
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(queueName)
+                .withConnectionProperties(connectionProperties())
+                .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
+                        1000);
+
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        GreedyConsumer greedyConsumer = new GreedyConsumer(kafkaClient, topicName, expect);
+
+        runTestBlocking(connectorPropertyFactory, greedyConsumer);
     }
 }


[camel-kafka-connector] 04/14: Converted the Azure storage queue source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0fa21491c49652a8ba4ef6b863b2429cf9d06318
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 18:35:26 2021 +0100

    Converted the Azure storage queue source test case to use the reusable source base class
---
 .../source/CamelSourceAzureStorageQueueITCase.java | 57 +++++++---------------
 1 file changed, 18 insertions(+), 39 deletions(-)

diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index f13eeb1..26faa6e 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -24,14 +24,13 @@ import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
 import org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
 import org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -39,23 +38,20 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Disabled(value = "Disabled due to issue #976")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAzureStorageQueueITCase.class);
 
     private QueueServiceClient client;
     private QueueClient queueClient;
     private String queueName;
+    private String topicName;
     private int expect = 10;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -64,11 +60,11 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         client = AzureStorageClientUtils.getClient();
         queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
 
         queueClient = client.createQueue(queueName);
-        received = 0;
     }
 
     @AfterEach
@@ -78,39 +74,22 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
         }
     }
 
-    private void sendMessages() {
-        for (int i = 0; i < expect; i++) {
-            queueClient.sendMessage("Test message " + i);
-        }
-    }
-
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    @Override
+    protected void produceTestData() {
         sendMessages();
+    }
 
-        LOG.debug("Initialized the connector and put the data for the test execution");
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+    private void sendMessages() {
+        for (int i = 0; i < expect; i++) {
+            queueClient.sendMessage("Test message " + i);
+        }
+    }
 
     @Test
     @Timeout(90)
@@ -120,12 +99,12 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
         ConnectorPropertyFactory connectorPropertyFactory = CamelSourceAzureStorageQueuePropertyFactory
                 .basic()
                 .withConfiguration(TestQueueConfiguration.class.getName())
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withQueueName(queueName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 }


[camel-kafka-connector] 05/14: Converted the Cassandra source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fe801e03a8760237844299af1e869a89a64e85d4
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 19:53:45 2021 +0100

    Converted the Cassandra source test case to use the reusable source base class
---
 .../source/CamelSourceCassandraITCase.java         | 60 +++++++++-------------
 1 file changed, 24 insertions(+), 36 deletions(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
index 04c9402..6508546 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
@@ -19,18 +19,16 @@ package org.apache.camel.kafkaconnector.cassandra.source;
 
 import java.util.concurrent.ExecutionException;
 
-import com.datastax.oss.driver.api.core.cql.Row;
 import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestResultSetConversionStrategy;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.test.infra.cassandra.services.CassandraService;
 import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -43,7 +41,7 @@ import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFacto
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceCassandraITCase extends AbstractKafkaTest {
+public class CamelSourceCassandraITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static CassandraService cassandraService = CassandraServiceFactory.createService();
 
@@ -51,18 +49,17 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest {
 
     private CassandraClient cassandraClient;
     private TestDataDao testDataDao;
+    private String topicName;
 
     private final int expect = 1;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-cql-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
+    @BeforeAll
+    public void setUpTestData() {
         cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port());
 
         testDataDao = cassandraClient.newTestDataDao();
@@ -76,7 +73,12 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest {
         }
     }
 
-    @AfterEach
+    @BeforeEach
+    public void setUpTest() {
+        topicName = getTopicForTest(this);
+    }
+
+    @AfterAll
     public void tearDown() {
         if (testDataDao != null) {
             try {
@@ -87,59 +89,45 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest {
         }
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        return false;
+    @Override
+    protected void produceTestData() {
+        // NO-OP (done at the testSetup)
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, Row> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
-
     @Timeout(90)
     @Test
     public void testRetrieveFromCassandra() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                .withKafkaTopic(topic)
+                .withKafkaTopic(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 .withResultSetConversionStrategy("ONE")
                 .withCql(testDataDao.getSelectStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Timeout(90)
     @Test
     public void testRetrieveFromCassandraWithCustomStrategy() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                .withKafkaTopic(topic)
+                .withKafkaTopic(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 .withResultSetConversionStrategy(classRef(TestResultSetConversionStrategy.class.getName()))
                 .withCql(testDataDao.getSelectStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 02/14: Added a reusable base class for the source tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fc34cbaf2bd7941407d3d74ce82f0d14351f2142
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 13:50:17 2021 +0100

    Added a reusable base class for the source tests
---
 .../common/test/AbstractTestMessageConsumer.java   | 66 +++++++++++++++++++
 .../common/test/CamelSourceTestSupport.java        | 75 ++++++++++++++++++++++
 .../common/test/StringMessageConsumer.java         | 29 +++++++++
 .../common/test/TestMessageConsumer.java           | 27 ++++++++
 4 files changed, 197 insertions(+)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
new file mode 100644
index 0000000..2fcf42f
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class);
+
+    private final KafkaClient<String, T> kafkaClient;
+    private final String topicName;
+    private final int count;
+    private final List<ConsumerRecord<String, T>> receivedMessages;
+    private volatile int received;
+
+    public AbstractTestMessageConsumer(KafkaClient<String, T> kafkaClient, String topicName, int count) {
+        this.kafkaClient = kafkaClient;
+        this.topicName = topicName;
+        this.count = count;
+
+        receivedMessages = new ArrayList<>(count);
+    }
+
+    private boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
+        receivedMessages.add(record);
+
+        if (received == count) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void consumeMessages() {
+        kafkaClient.consume(topicName, this::checkRecord);
+    }
+
+    @Override
+    public List<ConsumerRecord<String, T>> consumedMessages() {
+        return receivedMessages;
+    }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
new file mode 100644
index 0000000..7c9ee9b
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceTestSupport.class);
+
+    protected abstract void produceTestData();
+
+    protected abstract void verifyMessages(TestMessageConsumer<?> consumer);
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count);
+
+        runTest(connectorPropertyFactory, consumer);
+    }
+
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        LOG.debug("Initialized the connector and put the data for the test execution");
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
+        produceTestData();
+
+        LOG.debug("Creating the Kafka consumer ...");
+        consumer.consumeMessages();
+        LOG.debug("Ran the Kafka consumer ...");
+
+        LOG.debug("Verifying messages");
+        verifyMessages(consumer);
+        LOG.debug("Verified messages");
+    }
+
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java
new file mode 100644
index 0000000..f2105b0
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/StringMessageConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A consumer that receives the 'count' amount of text messages from the Kafka broker
+ */
+public class StringMessageConsumer extends AbstractTestMessageConsumer<String> {
+    public StringMessageConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+        super(kafkaClient, topicName, count);
+    }
+}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java
new file mode 100644
index 0000000..2034539
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageConsumer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public interface TestMessageConsumer<T> {
+    void consumeMessages();
+    List<ConsumerRecord<String, T>> consumedMessages();
+}


[camel-kafka-connector] 11/14: Converted the Syslog source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit de00cf1f2460089ac3d2a503a5a73c419263d9f5
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:48:12 2021 +0100

    Converted the Syslog source test case to use the reusable source base class
---
 .../syslog/source/CamelSourceSyslogITCase.java     | 124 ++++++++++-----------
 1 file changed, 59 insertions(+), 65 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 34d8228..7f14a2e 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -17,17 +17,22 @@
 
 package org.apache.camel.kafkaconnector.syslog.source;
 
+import java.util.concurrent.ExecutionException;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.TestInstance;
@@ -36,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
 
 
 /**
@@ -44,91 +48,81 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceSyslogITCase extends AbstractKafkaTest {
-    private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
-
+public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSyslogITCase.class);
+    private static final String HOST = "localhost";
+    private static final String PROTOCOL = "udp";
+    private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP);
 
-    private int received;
     private final int expect = 1;
+    private ConnectorPropertyFactory connectorPropertyFactory;
+    private String topicName;
+
+    private CamelContext camelContext;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-syslog-kafka-connector"};
     }
 
+    @BeforeAll
+    public void setupCamelContext() throws Exception {
+        LOG.debug("Creating the Camel context");
+        camelContext = new DefaultCamelContext();
+        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:test")
+                        .marshal(new SyslogDataFormat())
+                        .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, HOST, FREE_PORT);
+            }
+        });
+    }
+
     @BeforeEach
     public void setUp() {
-        received = 0;
-    }
+        topicName = getTopicForTest(this);
 
-    private void produceLogMessages(String protocol, String host, String port, String message) {
-        CamelContext camelContext = new DefaultCamelContext();
-
-        try {
-            camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
-            camelContext.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() {
-                    from("direct:test").marshal(new SyslogDataFormat()).to("netty:" + protocol + ":" + host + ":" + port + "?sync=false&encoders=#encoder&useByteBuf=true");
-                }
-            });
-
-            camelContext.start();
-            camelContext.createProducerTemplate().sendBody("direct:test", message);
-        } catch (Exception e) {
-            LOG.error("Failed to send log messages {} to : {}", message, "netty:" + protocol + ":" + host + ":" + port);
-            fail(e.getMessage());
-        } finally {
-            camelContext.stop();
-        }
+        camelContext.start();
+        TestUtils.waitFor(camelContext::isStarted);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @AfterEach
+    public void tearDown() {
+        camelContext.stop();
     }
 
-    private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+    @Override
+    protected void produceTestData() {
+        String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
-        // Add a small delay to let Camel finish netty initialization otherwise the port may be unreachable
-        Thread.sleep(1000);
-        produceLogMessages(connectorPropertyFactory.getProperties().get("camel.source.path.protocol").toString(),
-                connectorPropertyFactory.getProperties().get("camel.source.path.host").toString(),
-                connectorPropertyFactory.getProperties().get("camel.source.path.port").toString(),
-                "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
+        camelContext.createProducerTemplate().sendBody("direct:test", message);
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @RepeatedTest(3)
     @Timeout(90)
-    public void testBasicSend() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHost("localhost")
-                    .withPort(FREE_PORT)
-                    .withProtocol("udp");
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+    public void testBasicSend() throws ExecutionException, InterruptedException {
+        connectorPropertyFactory = CamelSyslogPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withHost(HOST)
+                .withPort(FREE_PORT)
+                .withProtocol(PROTOCOL);
+
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer stringMessageConsumer = new StringMessageConsumer(kafkaClient, topicName, expect);
+
+        runTestBlocking(connectorPropertyFactory, stringMessageConsumer);
     }
 }


[camel-kafka-connector] 10/14: Converted the SSH source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 550d1a2a64771200568ae40be24e5af4aa5448f8
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:14:35 2021 +0100

    Converted the SSH source test case to use the reusable source base class
---
 .../ssh/sink/CamelSinkSshITCase.java               |  3 +-
 .../ssh/source/CamelSourceSshITCase.java           | 47 +++++++---------------
 2 files changed, 17 insertions(+), 33 deletions(-)

diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index d0535d4..02f6f21 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -28,6 +28,7 @@ import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkSshITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
@@ -69,7 +71,6 @@ public class CamelSinkSshITCase extends CamelSinkTestSupport {
     }
 
 
-
     @Override
     protected void consumeMessages(CountDownLatch latch) {
         latch.countDown();
diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
index 6673c01..488029d 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java
@@ -19,36 +19,29 @@ package org.apache.camel.kafkaconnector.ssh.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceSshITCase extends AbstractKafkaTest {
+public class CamelSourceSshITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSshITCase.class);
-
     private final int expect = 1;
-    private int received;
     private String oldUserHome = System.getProperty("user.home");
 
     @Override
@@ -56,41 +49,32 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
         return new String[] {"camel-ssh-kafka-connector"};
     }
 
-    @BeforeEach
+    @BeforeAll
     public void setupKeyHome() {
         System.setProperty("user.home", "target/user-home");
     }
 
-    @AfterEach
+    @AfterAll
     public void tearDownKeyHome() {
         System.setProperty("user.home", oldUserHome);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
+    @Override
+    protected void produceTestData() {
 
-        return false;
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Timeout(90)
     @Test
     public void testRetrieveFromSsh() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
+        String topic = getTopicForTest(this);
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
                 .basic()
@@ -105,7 +89,6 @@ public class CamelSourceSshITCase extends AbstractKafkaTest {
                     .withEntry("type", "org.apache.camel.kafkaconnector.ssh.transformers.SshTransforms")
                 .end();
 
-
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topic, expect);
     }
 }


[camel-kafka-connector] 13/14: Disabled Azure storage blob tests due to GH issue #997

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 2d018cbf550dd6648917c67c7514e6aa4cdd60e8
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 17:04:16 2021 +0100

    Disabled Azure storage blob tests due to GH issue #997
---
 .../azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java        | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 727d3fb..80da7b1 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -38,6 +38,7 @@ import org.apache.camel.test.infra.azure.storage.blob.clients.AzureStorageBlobCl
 import org.apache.camel.test.infra.azure.storage.blob.services.AzureStorageBlobServiceFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@Disabled(value = "Disabled due to GH issue #997")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     @RegisterExtension


[camel-kafka-connector] 09/14: Converted the SQL source test case to use the reusable source base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f2f51b1309586306b4a8fb5124edee176d9d9f97
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Feb 9 09:09:38 2021 +0100

    Converted the SQL source test case to use the reusable source base class
---
 .../sql/source/CamelSourceSQLITCase.java           | 47 ++++++++--------------
 1 file changed, 17 insertions(+), 30 deletions(-)

diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
index 4bf3bd5..6ee744c 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java
@@ -19,20 +19,15 @@ package org.apache.camel.kafkaconnector.sql.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 
@@ -40,14 +35,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Database connection fails when running with the embedded Kafka Connect instance")
-public class CamelSourceSQLITCase extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSQLITCase.class);
-
+public class CamelSourceSQLITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public JDBCService sqlService;
 
     private final int expect = 1;
-    private int received;
 
     public CamelSourceSQLITCase() {
         JdbcDatabaseContainer<?> container = new PostgreSQLContainer<>("postgres:9.6.2")
@@ -69,34 +61,29 @@ public class CamelSourceSQLITCase extends AbstractKafkaTest {
         return new String[] {"camel-sql-kafka-connector"};
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        return false;
+    @Override
+    protected void produceTestData() {
+        // NO-OP, already done via init script in the service initialization
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
     @Timeout(30)
     @Test
     public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-            .withQuery("select * from test").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
+        String topicName = getTopicForTest(this);
+
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory
+                .basic()
+                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("select * from test")
+                .withTopics(topicName);
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
 
     }
 }