You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:47 UTC
[camel-kafka-connector] 07/14: Converted the RabbitMQ source test
case to use the reusable source base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b55096ed893a0d74ec32821b510df2bbd71c68b0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:03:43 2021 +0100
Converted the RabbitMQ source test case to use the reusable source base class
---
.../rabbitmq/source/RabbitMQSourceITCase.java | 64 +++++++++-------------
1 file changed, 25 insertions(+), 39 deletions(-)
diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
index 15950e7..4ef2ae2 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
@@ -18,14 +18,12 @@ package org.apache.camel.kafkaconnector.rabbitmq.source;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class RabbitMQSourceITCase extends AbstractKafkaTest {
+public class RabbitMQSourceITCase extends CamelSourceTestSupport {
@RegisterExtension
public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
@@ -45,7 +43,7 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
private RabbitMQClient rabbitMQClient;
- private int received;
+ private String topicName;
private final int expect = 10;
@Override
@@ -55,55 +53,43 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- received = 0;
+ topicName = getTopicForTest(this);
rabbitMQClient = new RabbitMQClient(rabbitmqService.getAmqpUrl());
- }
-
- private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
- return true;
- }
-
- public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+ }
+ @Override
+ protected void produceTestData() {
for (int i = 0; i < expect; i++) {
rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message");
}
+ }
- LOG.debug("Creating the kafka consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the kafka consumer ...");
-
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
+
@Test
@Timeout(90)
public void testSource() throws ExecutionException, InterruptedException {
ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withUrl("")
- .append("username", rabbitmqService.connectionProperties().username())
- .append("password", rabbitmqService.connectionProperties().password())
- .append("autoDelete", "false")
- .append("queue", DEFAULT_RABBITMQ_QUEUE)
- .append("skipExchangeDeclare", "true")
- .append("skipQueueBind", "true")
- .append("hostname", rabbitmqService.connectionProperties().hostname())
- .append("portNumber", rabbitmqService.connectionProperties().port())
- .buildUrl();
-
- runBasicStringTest(factory);
+ .append("username", rabbitmqService.connectionProperties().username())
+ .append("password", rabbitmqService.connectionProperties().password())
+ .append("autoDelete", "false")
+ .append("queue", DEFAULT_RABBITMQ_QUEUE)
+ .append("skipExchangeDeclare", "true")
+ .append("skipQueueBind", "true")
+ .append("hostname", rabbitmqService.connectionProperties().hostname())
+ .append("portNumber", rabbitmqService.connectionProperties().port())
+ .buildUrl();
+
+ runTest(factory, topicName, expect);
}
}