You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:47 UTC

[camel-kafka-connector] 07/14: Converted the RabbitMQ source test case to use the reusable source base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b55096ed893a0d74ec32821b510df2bbd71c68b0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:03:43 2021 +0100

    Converted the RabbitMQ source test case to use the reusable source base class
---
 .../rabbitmq/source/RabbitMQSourceITCase.java      | 64 +++++++++-------------
 1 file changed, 25 insertions(+), 39 deletions(-)

diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
index 15950e7..4ef2ae2 100644
--- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
+++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
@@ -18,14 +18,12 @@ package org.apache.camel.kafkaconnector.rabbitmq.source;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class RabbitMQSourceITCase extends AbstractKafkaTest {
+public class RabbitMQSourceITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService();
 
@@ -45,7 +43,7 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
     private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
 
     private RabbitMQClient rabbitMQClient;
-    private int received;
+    private String topicName;
     private final int expect = 10;
 
     @Override
@@ -55,55 +53,43 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
         rabbitMQClient =  new RabbitMQClient(rabbitmqService.getAmqpUrl());
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
 
-        return true;
-    }
-
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
         rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+    }
 
+    @Override
+    protected void produceTestData() {
         for (int i = 0; i < expect; i++) {
             rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message");
         }
+    }
 
-        LOG.debug("Creating the kafka consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the kafka consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(90)
     public void testSource() throws ExecutionException, InterruptedException {
         ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
                 .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withUrl("")
-                .append("username", rabbitmqService.connectionProperties().username())
-                .append("password", rabbitmqService.connectionProperties().password())
-                .append("autoDelete", "false")
-                .append("queue", DEFAULT_RABBITMQ_QUEUE)
-                .append("skipExchangeDeclare", "true")
-                .append("skipQueueBind", "true")
-                .append("hostname", rabbitmqService.connectionProperties().hostname())
-                .append("portNumber", rabbitmqService.connectionProperties().port())
-                .buildUrl();
-
-        runBasicStringTest(factory);
+                    .append("username", rabbitmqService.connectionProperties().username())
+                    .append("password", rabbitmqService.connectionProperties().password())
+                    .append("autoDelete", "false")
+                    .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                    .append("skipExchangeDeclare", "true")
+                    .append("skipQueueBind", "true")
+                    .append("hostname", rabbitmqService.connectionProperties().hostname())
+                    .append("portNumber", rabbitmqService.connectionProperties().port())
+                    .buildUrl();
+
+        runTest(factory, topicName, expect);
     }
 }