You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/03 17:24:38 UTC
[camel-kafka-connector] 05/18: Convert the Cassandra tests to the
new reusable sink test base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 79a5f91e52c023affdaff1121c142113a0fda583
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 11:33:28 2021 +0100
Convert the Cassandra tests to the new reusable sink test base class
---
.../cassandra/sink/CamelSinkCassandraITCase.java | 83 +++++++++-------------
1 file changed, 33 insertions(+), 50 deletions(-)
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index 56a0930..2949fff 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -17,17 +17,14 @@
package org.apache.camel.kafkaconnector.cassandra.sink;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient;
import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.test.infra.cassandra.services.CassandraService;
import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory;
@@ -40,11 +37,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkCassandraITCase extends AbstractKafkaTest {
+public class CamelSinkCassandraITCase extends CamelSinkTestSupport {
@RegisterExtension
public static CassandraService cassandraService = CassandraServiceFactory.createService();
@@ -52,6 +49,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
private CassandraClient cassandraClient;
private TestDataDao testDataDao;
+ private String topicName;
private final int expect = 10;
private int received;
@@ -63,6 +61,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
+ topicName = getTopicForTest(this);
cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port());
testDataDao = cassandraClient.newTestDataDao();
@@ -70,6 +69,8 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
testDataDao.createKeySpace();
testDataDao.useKeySpace();
testDataDao.createTable();
+
+ received = 0;
}
@AfterEach
@@ -83,81 +84,63 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
}
}
- private void putRecords(CountDownLatch latch) {
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ return null;
+ }
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
try {
- for (int i = 0; i < expect; i++) {
- try {
- kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i);
- } catch (ExecutionException e) {
- LOG.error("Unable to produce messages: {}", e.getMessage(), e);
- } catch (InterruptedException e) {
- break;
- }
+ if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
+ fail("Did not receive enough data");
}
+ testDataDao.getData(this::checkRetrievedData);
} finally {
latch.countDown();
}
}
- private void checkRetrievedData(String data) {
- if (data != null) {
- received++;
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+ if (latch.await(30, TimeUnit.SECONDS)) {
+ assertEquals(expect, received,
+ "Didn't process the expected amount of messages: " + received + " != " + expect);
+ } else {
+ fail("Failed to receive the messages within the specified time");
}
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
-
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService service = Executors.newCachedThreadPool();
- service.submit(() -> putRecords(latch));
-
- if (!latch.await(30, TimeUnit.SECONDS)) {
- fail("Timed out wait for data to be added to the Kafka cluster");
- }
-
- if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) {
- fail("Did not receive enough data");
+ private void checkRetrievedData(String data) {
+ if (data != null) {
+ received++;
}
- testDataDao.getData(this::checkRetrievedData);
- assertTrue(received >= expect,
- String.format("Did not receive as much data as expected: %d < %d", received, expect));
-
}
@Timeout(90)
@Test
- public void testFetchFromCassandra() throws ExecutionException, InterruptedException {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+ public void testFetchFromCassandra() throws Exception {
ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
.basic()
- .withTopics(topic)
+ .withTopics(topicName)
.withHosts(cassandraService.getCassandraHost())
.withPort(cassandraService.getCQL3Port())
.withKeySpace(TestDataDao.KEY_SPACE)
.withCql(testDataDao.getInsertStatement());
- runTest(connectorPropertyFactory);
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Timeout(90)
@Test
- public void testFetchFromCassandraWithUrl() throws ExecutionException, InterruptedException {
- String topic = TestUtils.getDefaultTestTopic(this.getClass());
-
+ public void testFetchFromCassandraWithUrl() throws Exception {
ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory
.basic()
- .withTopics(topic)
+ .withTopics(topicName)
.withUrl(cassandraService.getCQL3Endpoint(), TestDataDao.KEY_SPACE)
.append("cql", testDataDao.getInsertStatement())
.buildUrl();
- runTest(connectorPropertyFactory);
-
+ runTest(connectorPropertyFactory, topicName, expect);
}
}