You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:38 UTC

[camel-kafka-connector] 05/18: Convert the Cassandra tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 79a5f91e52c023affdaff1121c142113a0fda583
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:33:28 2021 +0100

    Convert the Cassandra tests to the new reusable sink test base class
---
 .../cassandra/sink/CamelSinkCassandraITCase.java   | 83 +++++++++-------------
 1 file changed, 33 insertions(+), 50 deletions(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 56a0930..2949fff 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -17,17 +17,14 @@
 
 package org.apache.camel.kafkaconnector.cassandra.sink;
 
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
 import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.cassandra.services.CassandraService;
 import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
@@ -40,11 +37,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkCassandraITCase extends AbstractKafkaTest {
+public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static CassandraService cassandraService = CassandraServiceFactory.createService();
 
@@ -52,6 +49,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
 
     private CassandraClient cassandraClient;
     private TestDataDao testDataDao;
+    private String topicName;
 
     private final int expect = 10;
     private int received;
@@ -63,6 +61,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port());
 
         testDataDao = cassandraClient.newTestDataDao();
@@ -70,6 +69,8 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         testDataDao.createKeySpace();
         testDataDao.useKeySpace();
         testDataDao.createTable();
+
+        received = 0;
     }
 
     @AfterEach
@@ -83,81 +84,63 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         }
     }
 
-    private void putRecords(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                try {
-                    kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
+            if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
+                fail("Did not receive enough data");
             }
+            testDataDao.getData(this::checkRetrievedData);
         } finally {
             latch.countDown();
         }
     }
 
-    private void checkRetrievedData(String data) {
-        if (data != null) {
-            received++;
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
         }
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        ExecutorService service = Executors.newCachedThreadPool();
-        service.submit(() -> putRecords(latch));
-
-        if (!latch.await(30, TimeUnit.SECONDS)) {
-            fail("Timed out wait for data to be added to the Kafka cluster");
-        }
-
-        if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
-            fail("Did not receive enough data");
+    private void checkRetrievedData(String data) {
+        if (data != null) {
+            received++;
         }
-        testDataDao.getData(this::checkRetrievedData);
-        assertTrue(received >= expect,
-                String.format("Did not receive as much data as expected: %d < %d", received, expect));
-
     }
 
     @Timeout(90)
     @Test
-    public void testFetchFromCassandra() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+    public void testFetchFromCassandra() throws Exception {
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                .withTopics(topic)
+                .withTopics(topicName)
                 .withHosts(cassandraService.getCassandraHost())
                 .withPort(cassandraService.getCQL3Port())
                 .withKeySpace(TestDataDao.KEY_SPACE)
                 .withCql(testDataDao.getInsertStatement());
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Timeout(90)
     @Test
-    public void testFetchFromCassandraWithUrl() throws ExecutionException, InterruptedException {
-        String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+    public void testFetchFromCassandraWithUrl() throws Exception {
         ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
                 .basic()
-                    .withTopics(topic)
+                    .withTopics(topicName)
                     .withUrl(cassandraService.getCQL3Endpoint(), TestDataDao.KEY_SPACE)
                     .append("cql", testDataDao.getInsertStatement())
                     .buildUrl();
 
-        runTest(connectorPropertyFactory);
-
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }