You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/09 14:39:03 UTC

[camel-kafka-connector] 05/11: Converted the MongoDB source test case to use the reusable source base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit d6f900dbc561c3480a55ba827c5fe5e295079d24
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:00:03 2021 +0100

    Converted the MongoDB source test case to use the reusable source base class
---
 .../mongodb/source/CamelSourceMongoDBITCase.java   | 49 +++++++++-------------
 1 file changed, 19 insertions(+), 30 deletions(-)

diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
index 9d09fce..671c6e1 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
@@ -26,44 +26,39 @@ import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.CreateCollectionOptions;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.test.infra.mongodb.services.MongoDBService;
 import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.bson.Document;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
+public class CamelSourceMongoDBITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static MongoDBService mongoDBService = MongoDBServiceFactory.createService();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceMongoDBITCase.class);
-
     private MongoClient mongoClient;
+    private String topicName;
 
     private final int expect = 10;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[]{"camel-mongodb-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
+    @BeforeAll
+    public void setUpDb() {
         mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl());
 
         MongoDatabase database = mongoClient.getDatabase("testDatabase");
@@ -93,25 +88,19 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
         collection.insertMany(documents);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+    @Override
+    protected void produceTestData() {
+        // NO-OP: static data already produced on the DB setup method
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
@@ -123,13 +112,13 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
                 mongoDBService.getReplicaSetUrl());
 
         ConnectorPropertyFactory factory = CamelMongoDBPropertyFactory.basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withConnectionBean("mongo",
                         BasicConnectorPropertyFactory.classRef(connectionBeanRef))
                 .withDatabase("testDatabase")
                 .withCollection("testCollection")
                 .withCreateCollection(true);
 
-        runTest(factory);
+        runTest(factory, topicName, expect);
     }
 }