You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:43 UTC

[camel-kafka-connector] branch camel-master updated (ad8d0a9 -> a0a30e7)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from ad8d0a9  SJMS2 itests: Avoid creating a new consumer for every message received
     new aaf1946  Make the AWS v2 sink test support reusable
     new c0e8ec1  Convert the AWS v1 tests to the new reusable sink test base class
     new e84c8c9  Convert the Azure storage blob tests to the new reusable sink test base class
     new 995b807  Convert the Azure storage queue tests to the new reusable sink test base class
     new b9b2fa0  Convert the Cassandra tests to the new reusable sink test base class
     new 51e7446  Convert the Couchbase tests to the new reusable sink test base class
     new 34dec6e  Convert the ElasticSearch tests to the new reusable sink test base class
     new 6cdd1de  Convert the File tests to the new reusable sink test base class
     new 2331048  Convert the HDFS tests to the new reusable sink test base class
     new 4f155bc  Convert the HTTP tests to the new reusable sink test base class
     new 114dfe5  Convert the JDBC tests to the new reusable sink test base class
     new 66ef021  Convert the MongoDB tests to the new reusable sink test base class
     new 16063c8  Convert the RabbitMQ tests to the new reusable sink test base class
     new 525def9  Removed unused variables from SJMS2 DQL sink test
     new f2dc1aa  Convert the SJMS2 tests to the new reusable sink test base class
     new fd4565b  Convert the SQL tests to the new reusable sink test base class
     new f3cd060  Convert the SSH tests to the new reusable sink test base class
     new ad4a4f8  Convert the Syslog tests to the new reusable sink test base class
     new b7c040b  Fix error message check for SJMS2 startup on error test
     new 61fab49  Temporarily disabled a error handling test for SJMS2 due to failures on GH actions
     new 8db432a  Disabled MongoDB source test case due to GH issue #974
     new a0a30e7  Disabled Azure storage queue tests due to GH issue #976

The 22 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java     | 131 +++++++----------
 .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java     | 130 +++++++----------
 .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java       |   4 +-
 .../aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java     |   4 +-
 .../aws/v2/iam/sink/CamelSinkAWSIAMITCase.java     |   4 +-
 .../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java |   4 +-
 .../aws/v2/kms/sink/CamelSinkAWSKMSITCase.java     |   4 +-
 .../aws/v2/s3/sink/CamelSinkAWSS3ITCase.java       |   4 +-
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     |   4 +-
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java |  98 +++++++------
 .../sink/CamelSinkAzureStorageQueueITCase.java     |  82 +++++------
 .../source/CamelSourceAzureStorageQueueITCase.java |   2 +
 .../cassandra/sink/CamelSinkCassandraITCase.java   |  83 +++++------
 .../kafkaconnector/common/AbstractKafkaTest.java   |   5 +
 .../common/test/CamelSinkTestSupport.java}         |  41 +++++-
 .../common/test/TestMessageProducer.java}          |   9 +-
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 100 ++++++-------
 .../sink/CamelSinkElasticSearchITCase.java         | 159 ++++++++-------------
 .../file/sink/CamelSinkFileITCase.java             | 122 ++++++++--------
 .../hdfs/sink/CamelSinkHDFSITCase.java             |  95 ++++++------
 .../http/sink/CamelSinkHTTPITCase.java             | 106 +++++++-------
 .../jdbc/sink/CamelSinkJDBCITCase.java             | 121 ++++++++--------
 .../mongodb/sink/CamelSinkMongoDBITCase.java       |  76 +++++-----
 .../mongodb/source/CamelSourceMongoDBITCase.java   |   2 +
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 120 ++++++++--------
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   |  73 ++++------
 .../sjms2/sink/CamelSinkJMSITCase.java             | 118 ++++++---------
 .../sjms2/sink/CamelSinkJMSStartupITCase.java      |   6 +-
 .../sjms2/sink/CamelSinkWithDLQJMSITCase.java      |   2 -
 .../sql/sink/CamelSinkSQLITCase.java               | 118 +++++++--------
 .../ssh/sink/CamelSinkSshITCase.java               |  67 +++++----
 .../syslog/sink/CamelSinkSyslogITCase.java         |  71 ++++-----
 32 files changed, 919 insertions(+), 1046 deletions(-)
 rename tests/{itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java => itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java} (67%)
 copy tests/{itests-jdbc/src/test/resources/schema.sql => itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java} (84%)


[camel-kafka-connector] 21/22: Disabled MongoDB source test case due to GH issue #974

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8db432a81a5d41e9073b2ddcb4607c7f815488bc
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 4 12:42:21 2021 +0100

    Disabled MongoDB source test case due to GH issue #974
---
 .../camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
index 9d09fce..9260f05 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
@@ -36,6 +36,7 @@ import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.bson.Document;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@Disabled(value = "Disabled due to issue #974")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
     @RegisterExtension


[camel-kafka-connector] 22/22: Disabled Azure storage queue tests due to GH issue #976

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit a0a30e75bc379dc006c9f4932f6ca77c36dd4aa2
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 4 13:56:40 2021 +0100

    Disabled Azure storage queue tests due to GH issue #976
---
 .../azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index b862255..f13eeb1 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -34,6 +34,7 @@ import org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueu
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@Disabled(value = "Disabled due to issue #976")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
     @RegisterExtension


[camel-kafka-connector] 10/22: Convert the HTTP tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 4f155bc7243e5123c95a04c2ea392f001273f1f9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:34:44 2021 +0100

    Convert the HTTP tests to the new reusable sink test base class
---
 .../http/sink/CamelSinkHTTPITCase.java             | 106 ++++++++++-----------
 1 file changed, 48 insertions(+), 58 deletions(-)

diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index 17aa85f..ea5d2db 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -20,17 +20,15 @@ package org.apache.camel.kafkaconnector.http.sink;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.http.impl.bootstrap.HttpServer;
 import org.apache.http.impl.bootstrap.ServerBootstrap;
 import org.junit.jupiter.api.AfterEach;
@@ -45,13 +43,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHTTPITCase extends AbstractKafkaTest {
+public class CamelSinkHTTPITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPITCase.class);
     private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost");
 
     private HttpServer localServer;
 
     private HTTPTestValidationHandler validationHandler;
+    private List<String> replies;
+    private String topicName;
 
     private final int expect = 10;
 
@@ -62,6 +62,8 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() throws IOException {
+        topicName = getTopicForTest(this);
+
         validationHandler = new HTTPTestValidationHandler(10);
         byte[] ipAddr = new byte[]{127, 0, 0, 1};
         InetAddress localhost = InetAddress.getByAddress(ipAddr);
@@ -83,76 +85,64 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-            } catch (ExecutionException e) {
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                break;
-            }
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            replies = validationHandler
+                    .getReplies()
+                    .get(30, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to ret replies: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, TimeoutException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(this::putRecords);
-
-        LOG.debug("Created the consumer ... About to receive messages");
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            if (replies == null) {
+                fail("Some messages should have been exchanged, but none seems to have gone through");
+            }
 
-        List<String> replies = validationHandler.getReplies().get(30, TimeUnit.SECONDS);
-        if (replies == null) {
-            fail("Some messages should have been exchanged, but none seems to have gone through");
-        }
+            for (String reply : replies) {
+                LOG.debug("Received: {} ", reply);
+            }
 
-        for (String reply : replies) {
-            LOG.debug("Received: {} ", reply);
+            assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
-
-        assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent");
-
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+    public void testBasicSendReceive() throws Exception {
+        String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHttpUri(url);
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+                .withTopics(topicName)
+                .withHttpUri(url);
 
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc";
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(hostName)
-                        .buildUrl();
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl(hostName)
+                    .buildUrl();
 
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 05/22: Convert the Cassandra tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b9b2fa047a803bd0441f110db6d42f6017c8e23b
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:33:28 2021 +0100

    Convert the Cassandra tests to the new reusable sink test base class
---
 .../cassandra/sink/CamelSinkCassandraITCase.java   | 83 +++++++++-------------
 1 file changed, 33 insertions(+), 50 deletions(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 56a0930..2949fff 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -17,17 +17,14 @@
 
 package org.apache.camel.kafkaconnector.cassandra.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.cassandra.services.CassandraService;
 import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
@@ -40,11 +37,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkCassandraITCase extends AbstractKafkaTest {
+public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static CassandraService cassandraService = CassandraServiceFactory.createService();
 
@@ -52,6 +49,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
 
     private CassandraClient cassandraClient;
     private TestDataDao testDataDao;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
@@ -63,6 +61,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port());
 
         testDataDao = cassandraClient.newTestDataDao();
@@ -70,6 +69,8 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         testDataDao.createKeySpace();
         testDataDao.useKeySpace();
         testDataDao.createTable();
+
+        received = 0;
     }
 
     @AfterEach
@@ -83,81 +84,63 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         }
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
+            if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
+                fail("Did not receive enough data");
             }
+            testDataDao.getData(this::checkRetrievedData);
         } finally {
             latch.countDown();
         }
     }
 
-    private void checkRetrievedData(String data) {
-        if (data != null) {
-            received++;
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
-            fail("Did not receive enough data");
+    private void checkRetrievedData(String data) {
+        if (data != null) {
+            received++;
         }
-        testDataDao.getData(this::checkRetrievedData);
-        assertTrue(received >= expect,
-                String.format("Did not receive as much data as expected: %d < %d", received, expect));
-
     }
 
     @Timeout(90)
     @Test
-    public void testFetchFromCassandra() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+    public void testFetchFromCassandra() throws Exception {
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                .withTopics(topic)
+                .withTopics(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 .withCql(testDataDao.getInsertStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Timeout(90)
     @Test
-    public void testFetchFromCassandraWithUrl() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+    public void testFetchFromCassandraWithUrl() throws Exception {
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                    .withTopics(topic)
+                    .withTopics(topicName)
                     .withUrl(cassandraService.getCQL3Endpoint(), TestDataDao.KEY_SPACE)
                     .append("cql", testDataDao.getInsertStatement())
                     .buildUrl();
 
-        runTest(connectorPropertyFactory);
-
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 13/22: Convert the RabbitMQ tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 16063c85a33d9a82e197de5a5317f6dc58fe314e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 14:05:06 2021 +0100

    Convert the RabbitMQ tests to the new reusable sink test base class
---
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 120 ++++++++++-----------
 1 file changed, 60 insertions(+), 60 deletions(-)

diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
index d2c3ad6..01ad213 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -17,17 +17,18 @@
 package org.apache.camel.kafkaconnector.rabbitmq.sink;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.rabbitmq.client.DeliverCallback;
 import com.rabbitmq.client.Delivery;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -40,13 +41,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class RabbitMQSinkITCase extends AbstractKafkaTest {
+public class RabbitMQSinkITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkITCase.class);
     private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
 
+    private String topicName;
     private RabbitMQClient rabbitMQClient;
     private int received;
     private final int expect = 10;
@@ -57,9 +59,48 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest {
     }
 
     @BeforeEach
-    public void setUp() {
+    public void setUp() throws Exception {
+        topicName = getTopicForTest(this);
         received = 0;
+
         rabbitMQClient =  new RabbitMQClient(rabbitmqService.getAmqpUrl());
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+        rabbitMQClient.start();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        rabbitMQClient.stop();
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
+            if (!this.checkRecord(delivery)) {
+                latch.countDown();
+            }
+        };
+
+        try {
+            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
+        } catch (Exception e) {
+            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(15, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
     }
 
     private boolean checkRecord(Delivery rabbitMQDelivery) {
@@ -75,65 +116,24 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        LOG.debug("Creating the consumer ...");
-        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
-        try {
-            rabbitMQClient.start();
-            consumeRabbitMQMessages(latch);
-
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-            }
-
-            LOG.debug("Created the consumer ... About to receive messages");
-
-            latch.await();
-            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } finally {
-            rabbitMQClient.stop();
-        }
-    }
-
     @Test
     @Timeout(90)
     public void testSource() throws Exception {
         ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withUrl("")
-                .append("username", rabbitmqService.connectionProperties().username())
-                .append("password", rabbitmqService.connectionProperties().password())
-                .append("autoDelete", "false")
-                .append("queue", DEFAULT_RABBITMQ_QUEUE)
-                .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
-                .append("skipExchangeDeclare", "true")
-                .append("skipQueueBind", "true")
-                .append("hostname", rabbitmqService.connectionProperties().hostname())
-                .append("portNumber", rabbitmqService.connectionProperties().port())
-                .buildUrl();
-
-        runBasicStringTest(factory);
-    }
-
-    private void consumeRabbitMQMessages(CountDownLatch latch) {
-        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
-            if (!this.checkRecord(delivery)) {
-                latch.countDown();
-            }
-        };
-        try {
-            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
-        } catch (Exception e) {
-            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+                .withTopics(topicName)
+                    .withUrl("")
+                    .append("username", rabbitmqService.connectionProperties().username())
+                    .append("password", rabbitmqService.connectionProperties().password())
+                    .append("autoDelete", "false")
+                    .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                    .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
+                    .append("skipExchangeDeclare", "true")
+                    .append("skipQueueBind", "true")
+                    .append("hostname", rabbitmqService.connectionProperties().hostname())
+                    .append("portNumber", rabbitmqService.connectionProperties().port())
+                    .buildUrl();
+
+        runTest(factory, topicName, expect);
     }
 }


[camel-kafka-connector] 18/22: Convert the Syslog tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit ad4a4f8b12765b316b0018e10e269b0154055db9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:59:57 2021 +0100

    Convert the Syslog tests to the new reusable sink test base class
---
 .../syslog/sink/CamelSinkSyslogITCase.java         | 71 ++++++++++++----------
 1 file changed, 39 insertions(+), 32 deletions(-)

diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 9273964..1b9f942 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,19 +16,19 @@
  */
 package org.apache.camel.kafkaconnector.syslog.sink;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -39,15 +39,14 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkSyslogITCase extends AbstractKafkaTest {
+public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP);
+    private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
     @RegisterExtension
     public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT);
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSyslogITCase.class);
-
-    private int received;
+    private String topicName;
     private final int expect = 1;
 
     @Override
@@ -57,36 +56,44 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
     }
 
-    private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+    @Override
+    protected String testMessageContent(int current) {
+        return TEST_TXT;
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-        LOG.debug("Creating the producer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
-        LOG.debug("Created the producer ...");
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        latch.countDown();
+    }
 
-        assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+        } else {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHost("localhost")
-                    .withPort(FREE_PORT)
-                    .withProtocol("udp");
-
-            runBasicProduceTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+    public void testBasicReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withHost("localhost")
+                .withPort(FREE_PORT)
+                .withProtocol("udp");
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 20/22: Temporarily disabled a error handling test for SJMS2 due to failures on GH actions

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 61fab4984b78eae9102373e37bf137c9915410fa
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 4 12:36:31 2021 +0100

    Temporarily disabled a error handling test for SJMS2 due to failures on GH actions
---
 .../camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java      | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
index 735db4b..ee25d26 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
@@ -27,6 +27,7 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
@@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 /**
  * A simple test to make sure we are not losing or hiding exception data on errors
  */
+@Disabled(value = "Needs further investigation about why it is failing with camel master")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkJMSStartupITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSStartupITCase.class);


[camel-kafka-connector] 14/22: Removed unused variables from SJMS2 DQL sink test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 525def966497aeccdcff17b2aaa72f90cac19f65
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 14:51:51 2021 +0100

    Removed unused variables from SJMS2 DQL sink test
---
 .../camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java      | 2 --
 1 file changed, 2 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
index 8eaa5c6..70125c7 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkWithDLQJMSITCase.java
@@ -44,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkWithDLQJMSITCase.class);
 
-    private int received;
     private final int expect = 10;
     private int errors;
     private final int expectedErrors = 1;
@@ -65,7 +64,6 @@ public class CamelSinkWithDLQJMSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
         errors = 0;
     }
 


[camel-kafka-connector] 04/22: Convert the Azure storage queue tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 995b8078ceb673e99c91e91c26a96622a8a23187
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:27:01 2021 +0100

    Convert the Azure storage queue tests to the new reusable sink test base class
---
 .../sink/CamelSinkAzureStorageQueueITCase.java     | 82 ++++++++++------------
 1 file changed, 38 insertions(+), 44 deletions(-)

diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index a7c16ed..d447703 100644
--- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -17,20 +17,17 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
 
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
 import com.azure.storage.queue.models.PeekedMessageItem;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -45,9 +42,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AzureService service = AzureStorageQueueServiceFactory.createAzureService();
 
@@ -56,6 +54,7 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
     private QueueServiceClient client;
     private QueueClient queueClient;
     private String queueName;
+    private String topicName;
     private int expect = 10;
     private int received;
 
@@ -66,6 +65,8 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         client = AzureStorageClientUtils.getClient();
         queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
 
@@ -80,6 +81,30 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            consume();
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
         received++;
         LOG.info("Received: {}", peekedMessageItem.getMessageText());
@@ -97,72 +122,41 @@ public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
         int count = queueClient.getProperties().getApproximateMessagesCount();
 
         queueClient.peekMessages(count, null, null).forEach(this::acknowledgeReceived);
-
-    }
-
-    private void putRecords() {
-        Map<String, String> messageParameters = new HashMap<>();
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                // This is for 3.4 only. From 3.5 and newer, the text is taken from the body
-                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageQueueMessageText", "test " + i);
-
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i, messageParameters);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        putRecords();
-
-        consume();
-
-        assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
     }
 
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceive() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
                 .basic()
                 .withConfiguration(TestQueueConfiguration.class.getName())
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withOperation("sendMessage")
                 .withQueueName(queueName);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceiveUrl() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory connectorPropertyFactory = CamelSinkAzureStorageQueuePropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestQueueConfiguration.class.getName())
                 .withUrl(azureCredentialsHolder.accountName() + "/" + queueName)
                 .append("accessKey", azureCredentialsHolder.accountKey())
                 .append("operation", "sendMessage")
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 12/22: Convert the MongoDB tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 66ef021abdb27afdc31af09b82bb80612034c363
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 13:53:46 2021 +0100

    Convert the MongoDB tests to the new reusable sink test base class
---
 .../mongodb/sink/CamelSinkMongoDBITCase.java       | 76 ++++++++++++----------
 1 file changed, 42 insertions(+), 34 deletions(-)

diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
index eb4cf2f..da1b02a 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
@@ -17,16 +17,16 @@
 
 package org.apache.camel.kafkaconnector.mongodb.sink;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.mongodb.services.MongoDBService;
 import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
@@ -43,13 +43,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
+public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MongoDBService mongoDBService = MongoDBServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelMongoDBPropertyFactory.class);
 
     private MongoClient mongoClient;
+    private String topicName;
+    private final String databaseName = "testDB";
+    private final String collectionName = "testRecords";
 
     private final int expect = 10;
 
@@ -61,28 +64,44 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl());
     }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected String testMessageContent(int current) {
+        return String.format("{\"test\": \"value %d\"}", current);
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                String data = String.format("{\"test\": \"value %d\"}", i);
-
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), data);
-            }
-
-        } catch (ExecutionException e) {
-            LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("The thread putting records to Kafka was interrupted");
-            fail("The thread putting records to Kafka was interrupted");
+            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
+            MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
+
+            LOG.info("Waiting for data on the MongoDB instance");
+            TestUtils.waitFor(() -> hasAllRecords(collection));
+        } finally {
+            latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(15, TimeUnit.SECONDS)) {
+            String databaseName = "testDB";
+            String collectionName = "testRecords";
+
+            verifyDocuments(databaseName, collectionName);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
     private boolean hasAllRecords(MongoCollection<Document> collection) {
         return collection.countDocuments() >= expect;
     }
@@ -91,34 +110,23 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
         MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
         MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
 
-        TestUtils.waitFor(() -> hasAllRecords(collection));
-
         assertEquals(expect, collection.countDocuments());
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        putRecords();
-    }
-
     @Test
-    @Timeout(90)
-    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
         String connectionBeanRef = String.format("com.mongodb.client.MongoClients#create('%s')",
                 mongoDBService.getReplicaSetUrl());
 
         CamelMongoDBPropertyFactory factory = CamelMongoDBPropertyFactory.basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConnectionBean("mongo",
                         BasicConnectorPropertyFactory.classRef(connectionBeanRef))
                 .withDatabase("testDB")
                 .withCollection("testRecords")
                 .withOperation("insert");
 
-        runTest(factory);
-
-        verifyDocuments("testDB", "testRecords");
+        runTest(factory, topicName, expect);
     }
 }


[camel-kafka-connector] 02/22: Convert the AWS v1 tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c0e8ec172a8dbaf6ee3158c431bd7ca986c338ad
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 10:54:01 2021 +0100

    Convert the AWS v1 tests to the new reusable sink test base class
---
 .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java     | 131 +++++++++------------
 .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java     | 130 ++++++++------------
 2 files changed, 102 insertions(+), 159 deletions(-)

diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 9d7cc80..8d893c9 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -18,19 +18,16 @@
 package org.apache.camel.kafkaconnector.aws.v1.sns.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
@@ -52,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createSNSService();
@@ -61,6 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
     private AWSSQSClient awsSqsClient;
     private String sqsQueueUrl;
     private String queueName;
+    private String topicName;
 
     private volatile int received;
     private final int expect = 10;
@@ -72,16 +70,31 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
+        topicName = getTopicForTest(this);
 
+        awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
         queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000);
         sqsQueueUrl = awsSqsClient.getQueue(queueName);
 
         LOG.info("Created SQS queue {}", sqsQueueUrl);
-
         received = 0;
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.getBody());
@@ -96,7 +109,8 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void consumeMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awsSqsClient.receiveFrom(sqsQueueUrl, this::checkMessages);
         } catch (Throwable t) {
@@ -107,92 +121,53 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory)
-            throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-        }
+    @Test
+    @Timeout(value = 90)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-        LOG.debug("Created the consumer ... About to receive messages");
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkConnectorDefault")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties);
 
-        if (latch.await(120, TimeUnit.SECONDS)) {
-            assertEquals(expect, received,
-                    "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(value = 90)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkConnectorDefault")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
-                    .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties);
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
+    public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-    @Test
-    @Timeout(value = 90)
-    public void testBasicSendReceiveUsingKafkaStyle() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkKafkaStyleConnector")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withTopicOrArn(queueName)
-                    .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
-                    .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
-
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnector")
+                .withTopics(topicName)
+                .withTopicOrArn(queueName)
+                .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName())
+                .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Disabled("AWS SNS component is failing to parse the sink URL for this one")
     @Test
     @Timeout(value = 90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            Properties amazonProperties = service.getConnectionProperties();
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        Properties amazonProperties = service.getConnectionProperties();
 
-            ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
-                    .withName("CamelAWSSNSSinkKafkaStyleConnector")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName)
+        ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic()
+                .withName("CamelAWSSNSSinkKafkaStyleConnector")
+                .withTopics(topicName)
+                .withUrl(queueName)
                     .append("queueUrl", sqsQueueUrl).append("subscribeSNStoSQS", "true")
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                     .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
-                    .append("configuration", "#class:" + TestSNSConfiguration.class.getName()).buildUrl();
+                    .append("configuration", "#class:" + TestSNSConfiguration.class.getName())
+                    .buildUrl();
 
-            runTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Amazon SNS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index 45028ae..b38441f 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -18,18 +18,16 @@
 package org.apache.camel.kafkaconnector.aws.v1.sqs.sink;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.clients.AWSClientUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
@@ -53,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createSQSService();
@@ -62,6 +60,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
     private AWSSQSClient awssqsClient;
     private String queueName;
     private String queueUrl;
+    private String topicName;
 
     private volatile int received;
     private final int expect = 10;
@@ -73,6 +72,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient());
 
         queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000);
@@ -90,6 +90,22 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received,
+                    expect));
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.getBody());
@@ -104,7 +120,8 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void consumeMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awssqsClient.receiveFrom(queueUrl, this::checkMessages);
         } catch (Throwable t) {
@@ -114,104 +131,55 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void produceMessages() {
-        try {
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-            }
-        } catch (Throwable t) {
-            LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t);
-            fail(String.format("Unable to publish messages to the broker: %s", t.getMessage()));
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        LOG.debug("Creating the consumer ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages();
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            assertEquals(expect, received,
-                    "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received,
-                    expect));
-        }
-    }
-
     @Test
     @Timeout(value = 120)
-    public void testBasicSendReceive() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorSpringBootStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withAmazonConfig(amazonProperties)
-                    .withQueueNameOrArn(queueName);
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorSpringBootStyle")
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties)
+                .withQueueNameOrArn(queueName);
+
+        runTest(testProperties, topicName, expect);
 
-            runTest(testProperties);
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
     }
 
     @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
     @Timeout(value = 120)
     @RepeatedTest(3)
-    public void testBasicSendReceiveUsingKafkaStyle() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceiveUsingKafkaStyle() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorKafkaStyle")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
-                    .withQueueNameOrArn(queueName);
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorKafkaStyle")
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE)
+                .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
-
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(testProperties, topicName, expect);
     }
 
     @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote")
     @Timeout(value = 120)
     @RepeatedTest(3)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            Properties amazonProperties = awsService.getConnectionProperties();
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
 
-            ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
-                    .withName("CamelAwssqsSinkConnectorUsingUrl")
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName)
+        ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic()
+                .withName("CamelAwssqsSinkConnectorUsingUrl")
+                .withTopics(topicName)
+                .withUrl(queueName)
                     .append("autoCreateQueue", "true")
                     .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
                     .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
                     .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL))
                     .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
-                    .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)).buildUrl();
+                    .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
+                .buildUrl();
 
-            runTest(testProperties);
-
-        } catch (Exception e) {
-            LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        runTest(testProperties, topicName, expect);
     }
 
 }


[camel-kafka-connector] 08/22: Convert the File tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6cdd1de1ff135a569a5b913d77f4932fa32afb86
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:15:23 2021 +0100

    Convert the File tests to the new reusable sink test base class
---
 .../file/sink/CamelSinkFileITCase.java             | 122 ++++++++++-----------
 1 file changed, 60 insertions(+), 62 deletions(-)

diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
index 2dbf459..ead6c58 100644
--- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
+++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
@@ -27,13 +27,12 @@ import java.nio.file.StandardWatchEventKinds;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,18 +42,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @Testcontainers
-public class CamelSinkFileITCase extends AbstractKafkaTest {
+public class CamelSinkFileITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkFileITCase.class);
 
     private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath();
     private static final String FILENAME = "test.txt";
 
+    private String topicName;
     private final int expect = 1;
 
     @Override
@@ -64,6 +64,7 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         cleanup();
     }
 
@@ -79,74 +80,46 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
         }
     }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
+    @Override
+    protected String testMessageContent(int current) {
+        return "test";
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, IOException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        putRecords();
-
-        LOG.debug("Created the consumer ... About to receive messages");
-
-        File sinkFile = new File(SINK_DIR, FILENAME);
-        File doneFile = new File(SINK_DIR, FILENAME + ".done");
-
-        waitForFile(sinkFile, doneFile);
-
-        assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath()));
-
-        checkFileContents(sinkFile);
-
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDirectoryName(SINK_DIR)
-                    .withFileName(FILENAME)
-                    .withDoneFileName(FILENAME + ".done");
+            File sinkFile = new File(SINK_DIR, FILENAME);
+            File doneFile = new File(SINK_DIR, FILENAME + ".done");
 
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {}", e.getMessage(), e);
+            waitForFile(sinkFile, doneFile);
+        } catch (InterruptedException e) {
+            fail(e.getMessage());
+        } catch (IOException e) {
             fail(e.getMessage());
+        } finally {
+            latch.countDown();
         }
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(SINK_DIR)
-                        .append("fileName", FILENAME)
-                        .append("doneFileName", FILENAME + ".done")
-                        .buildUrl();
-
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            File sinkFile = new File(SINK_DIR, FILENAME);
 
-            runTest(connectorPropertyFactory);
+            assertTrue(sinkFile.exists(), String.format("The file %s does not exist", sinkFile.getPath()));
 
-        } catch (Exception e) {
-            LOG.error("HTTP test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
+            try {
+                checkFileContents(sinkFile);
+            } catch (IOException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
@@ -212,4 +185,29 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
             retries--;
         } while (!doneFile.exists() && retries > 0);
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
+                .withTopics(topicName)
+                .withDirectoryName(SINK_DIR)
+                .withFileName(FILENAME)
+                .withDoneFileName(FILENAME + ".done");
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelFilePropertyFactory.basic()
+                .withTopics(topicName)
+                .withUrl(SINK_DIR)
+                .append("fileName", FILENAME)
+                .append("doneFileName", FILENAME + ".done")
+                .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }


[camel-kafka-connector] 01/22: Make the AWS v2 sink test support reusable

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit aaf1946e5cba49f9482970e10f54eb8769ea3f50
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 10:53:37 2021 +0100

    Make the AWS v2 sink test support reusable
---
 .../camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java   | 4 ++--
 .../camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java | 4 ++--
 .../camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java | 4 ++--
 .../aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java              | 4 ++--
 .../camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java | 4 ++--
 .../camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java   | 4 ++--
 .../camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 4 ++--
 .../org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java   | 5 +++++
 .../camel/kafkaconnector/common/test/CamelSinkTestSupport.java}     | 6 +++---
 9 files changed, 22 insertions(+), 17 deletions(-)

diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
index 71f56f1..9b27827 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSCWITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createCloudWatchService();
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
index 22176b0..ee6e350 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/ec2/sink/CamelSinkAWSEC2ITCase.java
@@ -25,9 +25,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -45,7 +45,7 @@ import software.amazon.awssdk.services.ec2.model.InstanceStatus;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkAWSEC2ITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSEC2ITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createEC2Service();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSEC2ITCase.class);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
index 3627add..7c212bc 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/iam/sink/CamelSinkAWSIAMITCase.java
@@ -25,9 +25,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.aws.v2.cw.sink.TestCloudWatchConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -45,7 +45,7 @@ import software.amazon.awssdk.services.iam.model.User;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkAWSIAMITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSIAMITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createIAMService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSIAMITCase.class);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
index 8d9e6c6..b975a3d 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -25,10 +25,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils;
 import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSKinesisITCase  extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSKinesisITCase  extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createKinesisService();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSKinesisITCase.class);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
index fffc0e9..4f57799 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kms/sink/CamelSinkAWSKMSITCase.java
@@ -25,8 +25,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -50,7 +50,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSKMSITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSKMSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createKMSService();
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
index fe64695..ea77a09 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/s3/sink/CamelSinkAWSS3ITCase.java
@@ -25,10 +25,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils;
 import org.apache.camel.kafkaconnector.aws.v2.s3.common.TestS3Configuration;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -48,7 +48,7 @@ import static org.apache.camel.kafkaconnector.aws.v2.s3.common.S3Utils.deleteBuc
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkAWSS3ITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSS3ITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AWSService service = AWSServiceFactory.createS3Service();
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSS3ITCase.class);
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 5b17a70..85b305b 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
+public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createSQSService();
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index f51e4d0..fb24ca9 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -21,6 +21,7 @@ import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
 import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -54,4 +55,8 @@ public abstract class AbstractKafkaTest {
     public KafkaConnectService getKafkaConnectService() {
         return kafkaConnectService;
     }
+
+    protected String getTopicForTest(Object testObject) {
+        return TestUtils.getDefaultTestTopic(testObject.getClass()) + "." + TestUtils.randomWithRange(0, 1000);
+    }
 }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
similarity index 95%
rename from tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
rename to tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index 6f77de6..d70c3d4 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.aws.v2.common;
+package org.apache.camel.kafkaconnector.common.test;
 
 import java.time.Duration;
 import java.util.Map;
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.fail;
 
-public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
+public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTestSupport.class);
 
     protected abstract Map<String, String> messageHeaders(String text, int current);
 


[camel-kafka-connector] 07/22: Convert the ElasticSearch tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 34dec6e33b7b6324fb1b6d57a400e6f47cebbe50
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:07:53 2021 +0100

    Convert the ElasticSearch tests to the new reusable sink test base class
---
 .../sink/CamelSinkElasticSearchITCase.java         | 159 ++++++++-------------
 1 file changed, 60 insertions(+), 99 deletions(-)

diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index 8358aac..c80f892 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -17,16 +17,12 @@
 
 package org.apache.camel.kafkaconnector.elasticsearch.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.elasticsearch.clients.ElasticSearchClient;
 import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon;
 import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchService;
@@ -41,26 +37,28 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
+public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
 
     private ElasticSearchClient client;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
     private final String transformKey = "index-test";
 
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-elasticsearch-rest-kafka-connector"};
@@ -68,32 +66,45 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         client = new ElasticSearchClient(elasticSearch.getElasticSearchHost(), elasticSearch.getPort(),
                 ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX);
 
         received = 0;
     }
 
-    private void putRecords(CountDownLatch latch) {
-        LOG.debug("Sending records to Kafka");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test");
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
+            client.waitForIndex();
+
+            LOG.debug("Waiting for data");
+            client.waitForData(expect);
         } finally {
             latch.countDown();
         }
 
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            SearchHits hits = client.getData();
+            assertNotNull(hits);
+
+            hits.forEach(this::verifyHit);
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void verifyHit(SearchHit searchHit) {
         String source = searchHit.getSourceAsString();
 
@@ -107,91 +118,41 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
         received++;
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        LOG.debug("Performing initialization");
-        getKafkaConnectService().initializeConnector(propertyFactory);
-
-        LOG.debug("Initialization complete");
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        LOG.debug("Waiting for records");
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        client.waitForIndex();
-
-        LOG.debug("Waiting for data");
-        client.waitForData(expect);
-
-        SearchHits hits = client.getData();
-
-        assertNotNull(hits);
-
-        hits.forEach(this::verifyHit);
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
 
     @Test
     @Timeout(90)
-    public void testIndexOperation() {
-        try {
-            String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
-                    .basic()
-                    .withTopics(topic)
-                    .withOperation("Index")
-                    .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
-                    .withHostAddress(elasticSearch.getHttpHostAddress())
-                    .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
-                    .withTransformsConfig("ElasticSearchTransforms")
-                        .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
-                        .withEntry("key", transformKey)
-                        .end();
-
-            runTest(propertyFactory);
-
-            LOG.debug("Created the consumer ... About to receive messages");
-        } catch (Exception e) {
-            LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testIndexOperation() throws Exception {
+        ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withOperation("Index")
+                .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+                .withHostAddress(elasticSearch.getHttpHostAddress())
+                .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+                .withTransformsConfig("ElasticSearchTransforms")
+                    .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+                    .withEntry("key", transformKey)
+                    .end();
+
+        runTest(propertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testIndexOperationUsingUrl() {
-        try {
-            String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-            ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
-                    .basic()
-                    .withTopics(topic)
-                    .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
-                        .append("hostAddresses", elasticSearch.getHttpHostAddress())
-                        .append("operation", "Index")
-                        .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
-                        .buildUrl()
-                    .withTransformsConfig("ElasticSearchTransforms")
-                        .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
-                        .withEntry("key", transformKey)
-                        .end();
-
-            runTest(propertyFactory);
-
-            LOG.debug("Created the consumer ... About to receive messages");
-        } catch (Exception e) {
-            LOG.error("ElasticSearch test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testIndexOperationUsingUrl() throws Exception {
+        ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER)
+                    .append("hostAddresses", elasticSearch.getHttpHostAddress())
+                    .append("operation", "Index")
+                    .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX)
+                    .buildUrl()
+                .withTransformsConfig("ElasticSearchTransforms")
+                    .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms")
+                    .withEntry("key", transformKey)
+                    .end();
+
+        runTest(propertyFactory, topicName, expect);
     }
 }


[camel-kafka-connector] 15/22: Convert the SJMS2 tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f2dc1aa69b420ec84e65b949ac15a3959a60e5c5
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 14:52:35 2021 +0100

    Convert the SJMS2 tests to the new reusable sink test base class
---
 .../common/test/CamelSinkTestSupport.java          |  29 ++++-
 .../common/test/TestMessageProducer.java           |  23 ++++
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   |  73 ++++++-------
 .../sjms2/sink/CamelSinkJMSITCase.java             | 118 ++++++++-------------
 4 files changed, 127 insertions(+), 116 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index 9f8460f..bd02eef 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.common.test;
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -60,7 +61,26 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
+    /**
+     * A simple test runner that follows the steps: initialize, start consumer, produce messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param topic the topic to send the messages to
+     * @param count the number of messages to send
+     * @throws Exception For test-specific exceptions
+     */
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception {
+        runTest(connectorPropertyFactory, () -> produceMessages(topic, count));
+    }
+
+    /**
+     * A more flexible test runner that can use a custom producer of test messages
+     * @param connectorPropertyFactory a factory for connector properties
+     * @param producer the test message producer
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    protected void runTest(ConnectorPropertyFactory connectorPropertyFactory, TestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
 
@@ -70,13 +90,16 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
         CountDownLatch latch = new CountDownLatch(1);
         service.submit(() -> consumeMessages(latch));
 
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages(topic, count);
+        producer.producerMessages();
+
+        LOG.debug("Waiting for the messages to be processed");
+        service.shutdown();
 
         LOG.debug("Waiting for the test to complete");
         verifyMessages(latch);
     }
 
+
     protected boolean waitForData() {
         try {
             Thread.sleep(Duration.ofSeconds(1).toMillis());
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
new file mode 100644
index 0000000..dedcf97
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/TestMessageProducer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+@FunctionalInterface
+public interface TestMessageProducer {
+    void producerMessages();
+}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index d2f06a7..432a20a 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -20,9 +20,7 @@ package org.apache.camel.kafkaconnector.sjms2.sink;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
@@ -31,9 +29,9 @@ import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
@@ -55,12 +53,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Integration tests for the JMS sink using idempotent features
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
-    @FunctionalInterface
-    interface Producer {
-        void producerMessages();
-    }
-
+public class CamelSinkIdempotentJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -97,25 +90,13 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
-    private boolean checkRecord(Message jmsMessage) {
-        if (jmsMessage instanceof TextMessage) {
-            try {
-                LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText());
-
-                received++;
-
-                return true;
-            } catch (JMSException e) {
-                LOG.error("Failed to read message: {}", e.getMessage(), e);
-                fail("Failed to read message: " + e.getMessage());
-            }
-        }
-
-        return false;
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
     }
 
-
-    private void consumeJMSMessages() {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         JMSClient jmsClient = null;
 
         try {
@@ -145,31 +126,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
             LOG.error("JMS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
         } finally {
+            latch.countDown();
+
             if (jmsClient != null) {
                 jmsClient.stop();
             }
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        service.submit(() -> consumeJMSMessages());
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(25, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received
+                    + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
 
-        producer.producerMessages();
+    private boolean checkRecord(Message jmsMessage) {
+        if (jmsMessage instanceof TextMessage) {
+            try {
+                LOG.debug("Received: {}", ((TextMessage) jmsMessage).getText());
 
-        LOG.debug("Waiting for the messages to be processed");
-        service.shutdown();
+                received++;
 
-        if (service.awaitTermination(25, TimeUnit.SECONDS)) {
-            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
+                return true;
+            } catch (JMSException e) {
+                LOG.error("Failed to read message: {}", e.getMessage(), e);
+                fail("Failed to read message: " + e.getMessage());
+            }
         }
+
+        return false;
     }
 
     private void produceMessagesNoProperties() {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 41b87a8..5e9b66d 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -17,11 +17,9 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
@@ -29,10 +27,8 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
@@ -53,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  * Integration tests for the JMS sink
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkJMSITCase extends AbstractKafkaTest {
+public class CamelSinkJMSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -62,6 +58,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJMSITCase.class);
 
+    private String topicName;
     private int received;
     private final int expect = 10;
 
@@ -83,6 +80,22 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
+
+        topicName = getTopicForTest(this);
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(35, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
     }
 
     private boolean checkRecord(Message jmsMessage) {
@@ -106,70 +119,8 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
         return false;
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        LOG.debug("Creating the consumer ...");
-        service.submit(() -> consumeJMSMessages(latch));
-
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i);
-        }
-
-        LOG.debug("Created the consumer ... About to receive messages");
-
-        if (latch.await(35, TimeUnit.SECONDS)) {
-            assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect);
-        } else {
-            fail("Failed to receive the messages within the specified time");
-        }
-    }
-
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withConnectionProperties(connectionProperties())
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE);
-
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withConnectionProperties(connectionProperties())
-                        .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
-                        .buildUrl();
-
-            runTest(connectorPropertyFactory);
-
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-
-    private void consumeJMSMessages(CountDownLatch latch) {
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         JMSClient jmsClient = null;
 
         try {
@@ -193,4 +144,29 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
             }
         }
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConnectionProperties(connectionProperties())
+                .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withConnectionProperties(connectionProperties())
+                    .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }


[camel-kafka-connector] 09/22: Convert the HDFS tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 23310487ad3ca5fa60d7885afb41eb28ae5f79de
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 12:27:06 2021 +0100

    Convert the HDFS tests to the new reusable sink test base class
---
 .../hdfs/sink/CamelSinkHDFSITCase.java             | 95 ++++++++++++----------
 1 file changed, 54 insertions(+), 41 deletions(-)

diff --git a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
index 00234b5..c7e7cc3 100644
--- a/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
+++ b/tests/itests-hdfs/src/test/java/org/apache/camel/kafkaconnector/hdfs/sink/CamelSinkHDFSITCase.java
@@ -19,10 +19,12 @@ package org.apache.camel.kafkaconnector.hdfs.sink;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.hdfs.utils.HDFSEasy;
 import org.apache.camel.test.infra.hdfs.v2.services.HDFSService;
@@ -38,13 +40,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkHDFSITCase extends AbstractKafkaTest {
+public class CamelSinkHDFSITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static HDFSService hdfsService = HDFSServiceFactory.createService();
 
@@ -52,6 +53,7 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
 
     private HDFSEasy hdfsEasy;
     private Path currentBasePath;
+    private String topicName;
 
     private final int expect = 10;
 
@@ -60,9 +62,9 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
         return new String[] {"camel-hdfs-kafka-connector"};
     }
 
-
     @BeforeEach
     public void setUp() throws IOException, URISyntaxException {
+        topicName = getTopicForTest(this);
         hdfsEasy = new HDFSEasy(hdfsService.getHDFSHost(), hdfsService.getPort());
 
         String currentPath = "/test" + TestUtils.randomWithRange(0, 256) + "/";
@@ -81,54 +83,51 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
         }
     }
 
-    private boolean filesCreated() {
-        return hdfsEasy.filesCreated(currentBasePath, expect);
+    @Override
+    protected String testMessageContent(int current) {
+        return "Sink test message: " + current;
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-    private String sendKafkaMessages(String baseMessage, int count) throws java.util.concurrent.ExecutionException, InterruptedException {
-        LOG.info("Sending data to Kafka");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < count; i++) {
-            kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), baseMessage + i);
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(this::filesCreated);
+        } finally {
+            latch.countDown();
         }
-        return baseMessage;
     }
 
-    @Test
-    @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHostname(hdfsService.getHDFSHost())
-                    .withPort(hdfsService.getPort())
-                    .withPath(currentBasePath.getName())
-                    .withSplitStrategy("MESSAGES:1,IDLE:1000");
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            boolean filesCreated = filesCreated();
+            assertTrue(filesCreated, "The files were not created on the remote host");
 
-            connectorPropertyFactory.log();
-            getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+            try {
+                assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
 
-            final String baseMessage = "Sink test message: ";
-            sendKafkaMessages(baseMessage, expect);
+                final String baseMessage = "Sink test message: ";
+                hdfsEasy.listFiles(currentBasePath)
+                        .stream()
+                        .filter(f -> !f.getPath().getName().contains(".opened"))
+                        .forEach(f -> printFile(f, baseMessage));
+            } catch (IOException e) {
+                fail(e.getMessage());
+            }
 
-            boolean filesCreated = TestUtils.waitFor(this::filesCreated);
-            assertTrue(filesCreated, "The files were not created on the remote host");
-            assertEquals(hdfsEasy.countFiles(currentBasePath), expect, "The number of files created vs expected do not match");
-            hdfsEasy.listFiles(currentBasePath)
-                    .stream()
-                    .filter(f -> !f.getPath().getName().contains(".opened"))
-                    .forEach(f -> printFile(f, baseMessage));
-
-        } catch (Exception e) {
-            LOG.error("HDFS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
-
+    private boolean filesCreated() {
+        return hdfsEasy.filesCreated(currentBasePath, expect);
+    }
 
     private void printFile(LocatedFileStatus f, String matchString) {
         try {
@@ -142,4 +141,18 @@ public class CamelSinkHDFSITCase extends AbstractKafkaTest {
             fail("I/O error: " + e.getMessage());
         }
     }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelHDFSPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withHostname(hdfsService.getHDFSHost())
+                .withPort(hdfsService.getPort())
+                .withPath(currentBasePath.getName())
+                .withSplitStrategy("MESSAGES:1,IDLE:1000");
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
 }


[camel-kafka-connector] 06/22: Convert the Couchbase tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 51e74466ad456bc386e3d19245839eafc276331e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:58:20 2021 +0100

    Convert the Couchbase tests to the new reusable sink test base class
---
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 100 +++++++++------------
 1 file changed, 43 insertions(+), 57 deletions(-)

diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index caacfab..15104ac 100644
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -21,9 +21,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.couchbase.client.core.diagnostics.EndpointPingReport;
@@ -35,9 +32,8 @@ import com.couchbase.client.java.json.JsonObject;
 import com.couchbase.client.java.manager.bucket.BucketSettings;
 import com.couchbase.client.java.query.QueryResult;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
@@ -54,7 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /*
@@ -66,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
+public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static CouchbaseService service = CouchbaseServiceFactory.getService();
 
@@ -99,7 +94,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
 
         LOG.debug("Bucket created");
 
-        topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
+        topic = getTopicForTest(this);
 
         try {
             String startDelay = System.getProperty("couchbase.test.start.delay", "1000");
@@ -111,18 +106,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
         }
     }
 
-    private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) {
-        entries.getValue().forEach(this::checkStatus);
-    }
-
-    private void checkStatus(EndpointPingReport endpointPingReport) {
-        if (endpointPingReport.state() == PingState.OK) {
-            LOG.debug("Endpoint {} is ok", endpointPingReport.id());
-        } else {
-            LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
-        }
-    }
-
     @AfterEach
     public void tearDown() {
         LOG.debug("Dropping the test bucket named {}", bucketName);
@@ -132,28 +115,51 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
         cluster.disconnect();
     }
 
-    private void produceMessages(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected String testMessageContent(int current) {
+        JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current));
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> parameters = new HashMap<>();
+        return jsonObject.toString();
+    }
 
-                parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(i));
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> parameters = new HashMap<>();
 
-                JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", i));
+        parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current));
 
-                try {
-                    kafkaClient.produce(topic, jsonObject.toString(), parameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
+        return parameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(this::waitForMinimumRecordCount);
         } finally {
             latch.countDown();
         }
+
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            verifyRecords();
+        } else {
+            fail("Failed to receive the records within the specified time");
+        }
+    }
+
+    private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) {
+        entries.getValue().forEach(this::checkStatus);
+    }
+
+    private void checkStatus(EndpointPingReport endpointPingReport) {
+        if (endpointPingReport.state() == PingState.OK) {
+            LOG.debug("Endpoint {} is ok", endpointPingReport.id());
+        } else {
+            LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
+        }
     }
 
     private boolean waitForMinimumRecordCount() {
@@ -191,26 +197,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
         LOG.debug("Received record: {}", results.get(0));
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        LOG.debug("Creating the producer and sending messages ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> produceMessages(latch));
-
-        assertTrue(TestUtils.waitFor(this::waitForMinimumRecordCount));
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            verifyRecords();
-        } else {
-            fail("Failed to receive the records within the specified time");
-        }
-    }
-
     @Disabled("Not formatting the URL correctly - issue #629")
     @Test
     @Timeout(90)
@@ -224,7 +210,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
                 .withUsername(service.getUsername())
                 .withPassword(service.getPassword());
 
-        runTest(factory);
+        runTest(factory, topic, expect);
     }
 
     @RepeatedTest(10)
@@ -243,6 +229,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
                     .buildUrl();
 
 
-        runTest(factory);
+        runTest(factory, topic, expect);
     }
 }


[camel-kafka-connector] 19/22: Fix error message check for SJMS2 startup on error test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b7c040bc7a6be6a686f571f3104bb6d86243ea6a
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Feb 4 08:59:54 2021 +0100

    Fix error message check for SJMS2 startup on error test
---
 .../camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
index f2dac55..735db4b 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSStartupITCase.java
@@ -52,7 +52,7 @@ public class CamelSinkJMSStartupITCase extends AbstractKafkaTest {
         Properties properties = new Properties();
 
         properties.put("camel.component.sjms2.connection-factory", "#class:org.apache.qpid.jms.JmsConnectionFactory");
-        properties.put("camel.component.sjms2.connection-factory.remoteURI", "tcp://invalid");
+        properties.put("camel.component.sjms2.connection-factory.remoteURI", "amqp://invalid");
 
         return properties;
     }
@@ -100,7 +100,7 @@ public class CamelSinkJMSStartupITCase extends AbstractKafkaTest {
         assertFalse(running, "The connector should be in a failed state");
 
         LOG.trace(trace);
-        assertTrue(trace.contains("Failed to resolve endpoint"),
+        assertTrue(trace.contains("Name or service not known"),
                 "Trace should contain a Camel error message");
     }
 


[camel-kafka-connector] 16/22: Convert the SQL tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit fd4565bf9658b19aa815dd0e8c30fe08d5b7cf8e
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:38:01 2021 +0100

    Convert the SQL tests to the new reusable sink test base class
---
 .../sql/sink/CamelSinkSQLITCase.java               | 118 ++++++++++-----------
 1 file changed, 55 insertions(+), 63 deletions(-)

diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
index 53fcca7..79bf8f9 100644
--- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
+++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/sink/CamelSinkSQLITCase.java
@@ -22,20 +22,16 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sql.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.sql.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
@@ -47,12 +43,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class CamelSinkSQLITCase extends AbstractKafkaTest {
+public class CamelSinkSQLITCase extends CamelSinkTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSQLITCase.class);
 
     @RegisterExtension
     public JDBCService sqlService;
+    private DatabaseClient client;
 
+    private String topicName;
     private final int expect = 1;
     private int received;
 
@@ -76,30 +74,58 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
         return new String[] {"camel-sql-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @BeforeEach
+    public void setUp() throws SQLException {
+        topicName = getTopicForTest(this);
+        client = new DatabaseClient(sqlService.jdbcUrl());
+    }
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> sqlParameters = new HashMap<>();
+    @Override
+    protected String testMessageContent(int current) {
+        return "test";
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> sqlParameters = new HashMap<>();
 
-                // The prefix 'CamelHeader' is removed by the SinkTask
-                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-                sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+        // The prefix 'CamelHeader' is removed by the SinkTask
+        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+        sqlParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
 
+        return sqlParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(() -> {
                 try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test", sqlParameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
                 }
-            }
+            });
         } finally {
             latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(25, TimeUnit.SECONDS)) {
+            try {
+                client.runQuery("select * from test", this::verifyData);
+                assertEquals(expect, received, "Did not receive as much data as expected");
+            } catch (SQLException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void verifyData(ResultSet rs) {
         try {
             received++;
@@ -115,48 +141,14 @@ public class CamelSinkSQLITCase extends AbstractKafkaTest {
         }
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        try {
-            DatabaseClient client = new DatabaseClient(sqlService.jdbcUrl());
-
-            TestUtils.waitFor(() -> {
-                try {
-                    return client.hasAtLeastRecords("test", expect);
-                } catch (SQLException e) {
-                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
-                    return false;
-                }
-            });
-
-            client.runQuery("select * from test", this::verifyData);
-        } catch (SQLException e) {
-            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
     @Test
-    public void testDBFetch() throws ExecutionException, InterruptedException {
-        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory.basic().withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
-            .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)").withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
-        runTest(factory);
-
+    public void testDBFetch() throws Exception {
+        CamelSqlPropertyFactory factory = CamelSqlPropertyFactory
+                .basic()
+                .withDataSource(CamelSqlPropertyFactory.classRef(TestDataSource.class.getName()))
+                .withQuery("insert into test(test_name, test_data) values(:#TestName,:#TestData)")
+                .withTopics(topicName);
+
+        runTest(factory, topicName, expect);
     }
 }


[camel-kafka-connector] 03/22: Convert the Azure storage blob tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e84c8c902e75a307ef766b3508ebe85c1fd70817
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:16:48 2021 +0100

    Convert the Azure storage blob tests to the new reusable sink test base class
---
 .../blob/sink/CamelSinkAzureStorageBlobITCase.java | 98 +++++++++++-----------
 .../common/test/CamelSinkTestSupport.java          |  6 +-
 2 files changed, 56 insertions(+), 48 deletions(-)

diff --git a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
index 2b46470..1bbf9f1 100644
--- a/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
+++ b/tests/itests-azure-storage-blob/src/test/java/org/apache/camel/kafkaconnector/azure/storage/blob/sink/CamelSinkAzureStorageBlobITCase.java
@@ -18,19 +18,18 @@
 package org.apache.camel.kafkaconnector.azure.storage.blob.sink;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobItem;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -46,18 +45,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
+public class CamelSinkAzureStorageBlobITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AzureService service = AzureStorageBlobServiceFactory.createAzureService();
-
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAzureStorageBlobITCase.class);
 
     private BlobServiceClient client;
     private BlobContainerClient blobContainerClient;
     private String blobContainerName;
     private Map<String, String> sentData = new HashMap<>();
+    private String topicName;
 
     private int expect = 10;
     private int received;
@@ -69,6 +69,7 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUpBlob() {
+        topicName = getTopicForTest(this);
         client = AzureStorageBlobClientUtils.getClient();
 
         blobContainerName = "test-" +  TestUtils.randomWithRange(1, 100);
@@ -82,8 +83,45 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected String testMessageContent(int current) {
+        return "test " + current + " data";
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> messageParameters = new HashMap<>();
+
+        String sentFile = "test " + current;
+
+        sentData.put(sentFile, testMessageContent(current));
+
+        messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
+
+        return messageParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            consume();
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private boolean canConsume() {
-        return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() > 0;
+        return blobContainerClient.exists() && blobContainerClient.listBlobs().stream().count() >= expect;
     }
 
 
@@ -111,71 +149,37 @@ public class CamelSinkAzureStorageBlobITCase extends AbstractKafkaTest {
         } while (received != 10 && retries > 0);
     }
 
-
-    private void putRecords() {
-        Map<String, String> messageParameters = new HashMap<>();
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                String sentFile = "test " + i;
-                String sentText = "test " + i + " data";
-
-                sentData.put(sentFile, sentText);
-
-                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAzureStorageBlobBlobName", sentFile);
-
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), sentText, messageParameters);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        putRecords();
-
-        consume();
-
-        assertEquals(expect, received, "Did not receive the same amount of messages that were sent");
-    }
-
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceive() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestBlobConfiguration.class.getName())
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withContainerName(blobContainerName)
                 .withOperation("uploadBlockBlob");
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUrl() throws InterruptedException, ExecutionException, IOException {
+    public void testBasicSendReceiveUrl() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = service.azureCredentials();
 
         ConnectorPropertyFactory factory = CamelSinkAzureStorageBlobPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestBlobConfiguration.class.getName())
                 .withUrl(azureCredentialsHolder.accountName() + "/" + blobContainerName)
                     .append("accessKey", azureCredentialsHolder.accountKey())
                     .append("operation", "uploadBlockBlob")
                     .buildUrl();
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
index d70c3d4..9f8460f 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSinkTestSupport.java
@@ -36,12 +36,16 @@ public abstract class CamelSinkTestSupport extends AbstractKafkaTest {
 
     protected abstract Map<String, String> messageHeaders(String text, int current);
 
+    protected String testMessageContent(int current) {
+        return  "Sink test message " + current;
+    }
+
     protected void produceMessages(String topicName, int count)  {
         try {
             KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
             for (int i = 0; i < count; i++) {
-                String message = "Sink test message " + i;
+                String message = testMessageContent(i);
                 Map<String, String> headers = messageHeaders(message, i);
 
                 if (headers == null) {


[camel-kafka-connector] 17/22: Convert the SSH tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit f3cd0607448e328cc9b2121258759721dd1b19a6
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 15:51:28 2021 +0100

    Convert the SSH tests to the new reusable sink test base class
---
 .../ssh/sink/CamelSinkSshITCase.java               | 67 +++++++++++-----------
 1 file changed, 32 insertions(+), 35 deletions(-)

diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
index cf7e9dd..1c71719 100644
--- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
+++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java
@@ -17,18 +17,16 @@
 
 package org.apache.camel.kafkaconnector.ssh.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.ssh.services.SshService;
 import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
@@ -40,46 +38,42 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container",
         disabledReason = "Hangs when running with the embedded Kafka Connect instance")
-public class CamelSinkSshITCase extends AbstractKafkaTest {
+public class CamelSinkSshITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static SshService sshService = SshServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSshITCase.class);
 
     private final int expect = 3;
+    private String topic;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-ssh-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "date");
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
-        } finally {
-            latch.countDown();
-        }
+    @BeforeEach
+    public void setUp() {
+        topic = TestUtils.getDefaultTestTopic(this.getClass());
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
+    @Override
+    protected String testMessageContent(int current) {
+        return "date";
+    }
 
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        latch.countDown();
+    }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
         if (!latch.await(30, TimeUnit.SECONDS)) {
             fail("Timed out wait for data to be added to the Kafka cluster");
         }
@@ -87,12 +81,15 @@ public class CamelSinkSshITCase extends AbstractKafkaTest {
 
     @Timeout(90)
     @Test
-    public void testSshCommand() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
-        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory.basic().withTopics(topic).withHost(sshService.getSshHost())
-            .withPort(Integer.toString(sshService.getSshPort())).withUsername("root").withPassword("root");
-
-        runTest(connectorPropertyFactory);
+    public void testSshCommand() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelSshPropertyFactory
+                .basic()
+                .withTopics(topic)
+                .withHost(sshService.getSshHost())
+                .withPort(Integer.toString(sshService.getSshPort()))
+                .withUsername("root")
+                .withPassword("root");
+
+        runTest(connectorPropertyFactory, topic, expect);
     }
 }


[camel-kafka-connector] 11/22: Convert the JDBC tests to the new reusable sink test base class

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 114dfe5990237f06042755a717d01354d6f47d0f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 13:43:45 2021 +0100

    Convert the JDBC tests to the new reusable sink test base class
---
 .../jdbc/sink/CamelSinkJDBCITCase.java             | 121 ++++++++++-----------
 1 file changed, 58 insertions(+), 63 deletions(-)

diff --git a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
index 87752a1..3663890 100644
--- a/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
+++ b/tests/itests-jdbc/src/test/java/org/apache/camel/kafkaconnector/jdbc/sink/CamelSinkJDBCITCase.java
@@ -22,22 +22,19 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.jdbc.client.DatabaseClient;
 import org.apache.camel.kafkaconnector.jdbc.services.TestDataSource;
 import org.apache.camel.test.infra.jdbc.services.JDBCService;
 import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +46,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkJDBCITCase extends AbstractKafkaTest {
+public class CamelSinkJDBCITCase extends CamelSinkTestSupport {
     @RegisterExtension
     static JDBCService jdbcService;
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkJDBCITCase.class);
+    private DatabaseClient client;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
@@ -74,36 +73,69 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
                 .build();
     }
 
+    @BeforeEach
+    public void setUp() throws SQLException {
+        topicName = getTopicForTest(this);
+        client = new DatabaseClient(jdbcService.jdbcUrl());
+        received = 0;
+    }
+
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-jdbc-kafka-connector"};
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+    @Override
+    protected String testMessageContent(int current) {
+        return "insert into test(test_name, test_data) values(:?TestName, :?TestData)";
+    }
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> jdbcParameters = new HashMap<>();
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> jdbcParameters = new HashMap<>();
 
-                // The prefix 'CamelHeader' is removed by the SinkTask
-                jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
-                jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + i);
+        // The prefix 'CamelHeader' is removed by the SinkTask
+        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName" + TestUtils.randomWithRange(0, 100));
+        jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data " + current);
 
+        return jdbcParameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            LOG.debug("Waiting for indices");
+
+            TestUtils.waitFor(() -> {
                 try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), body, jdbcParameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
+                    return client.hasAtLeastRecords("test", expect);
+                } catch (SQLException e) {
+                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
+                    return false;
                 }
-            }
+            });
+
         } finally {
             latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(20, TimeUnit.SECONDS)) {
+            try {
+                client.runQuery("select * from test", this::verifyData);
+
+                assertEquals(expect, received, "Did not receive the same amount of messages sent");
+            } catch (SQLException e) {
+                fail(e.getMessage());
+            }
+        } else {
+            fail(String.format("Failed to receive the messages within the specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
     private void verifyData(ResultSet rs) {
         try {
             received++;
@@ -112,58 +144,21 @@ public class CamelSinkJDBCITCase extends AbstractKafkaTest {
 
             assertTrue(testName.startsWith("SomeName"), String.format("Unexpected test name %s", testName));
             assertTrue(testData.startsWith("test data"), String.format("Unexpected test data %s", testData));
-
         } catch (SQLException e) {
             LOG.error("Unable to fetch record from result set: {}", e.getMessage(), e);
             fail(String.format("Unable to fetch record from result set: %s", e.getMessage()));
         }
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        LOG.debug("Waiting for indices");
-
-        try {
-            DatabaseClient client = new DatabaseClient(jdbcService.jdbcUrl());
-
-            TestUtils.waitFor(() -> {
-                try {
-                    return client.hasAtLeastRecords("test", expect);
-                } catch (SQLException e) {
-                    LOG.warn("Failed to read the test table: {}", e.getMessage(), e);
-                    return false;
-                }
-            });
-
-            client.runQuery("select * from test", this::verifyData);
-        } catch (SQLException e) {
-            LOG.error("Unable to execute the SQL query: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-
-        assertEquals(expect, received, "Did not receive the same amount of messages sent");
-        LOG.debug("Created the consumer ... About to receive messages");
-    }
-
+    @Timeout(30)
     @Test
-    public void testDBFetch() throws ExecutionException, InterruptedException {
+    public void testDBFetch() throws Exception {
         CamelJDBCPropertyFactory factory = CamelJDBCPropertyFactory.basic()
                 .withDataSource(CamelJDBCPropertyFactory.classRef(TestDataSource.class.getName()))
                 .withDataSourceName("someName")
                 .withUseHeaderAsParameters(true)
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()));
-
-        runTest(factory);
+                .withTopics(topicName);
 
+        runTest(factory, topicName, expect);
     }
 }