You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/04 14:12:55 UTC

[camel-kafka-connector] 12/22: Convert the MongoDB tests to the new reusable sink test base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 66ef021abdb27afdc31af09b82bb80612034c363
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 13:53:46 2021 +0100

    Convert the MongoDB tests to the new reusable sink test base class
---
 .../mongodb/sink/CamelSinkMongoDBITCase.java       | 76 ++++++++++++----------
 1 file changed, 42 insertions(+), 34 deletions(-)

diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
index eb4cf2f..da1b02a 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java
@@ -17,16 +17,16 @@
 
 package org.apache.camel.kafkaconnector.mongodb.sink;
 
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.mongodb.services.MongoDBService;
 import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
@@ -43,13 +43,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
+public class CamelSinkMongoDBITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static MongoDBService mongoDBService = MongoDBServiceFactory.createService();
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelMongoDBPropertyFactory.class);
 
     private MongoClient mongoClient;
+    private String topicName;
+    private final String databaseName = "testDB";
+    private final String collectionName = "testRecords";
 
     private final int expect = 10;
 
@@ -61,28 +64,44 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl());
     }
 
-    private void putRecords() {
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected String testMessageContent(int current) {
+        return String.format("{\"test\": \"value %d\"}", current);
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
         try {
-            for (int i = 0; i < expect; i++) {
-                String data = String.format("{\"test\": \"value %d\"}", i);
-
-                kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), data);
-            }
-
-        } catch (ExecutionException e) {
-            LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("The thread putting records to Kafka was interrupted");
-            fail("The thread putting records to Kafka was interrupted");
+            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
+            MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
+
+            LOG.info("Waiting for data on the MongoDB instance");
+            TestUtils.waitFor(() -> hasAllRecords(collection));
+        } finally {
+            latch.countDown();
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws InterruptedException {
+        if (latch.await(15, TimeUnit.SECONDS)) {
+            String databaseName = "testDB";
+            String collectionName = "testRecords";
+
+            verifyDocuments(databaseName, collectionName);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
     private boolean hasAllRecords(MongoCollection<Document> collection) {
         return collection.countDocuments() >= expect;
     }
@@ -91,34 +110,23 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest {
         MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
         MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
 
-        TestUtils.waitFor(() -> hasAllRecords(collection));
-
         assertEquals(expect, collection.countDocuments());
     }
 
-    public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException {
-        propertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1);
-
-        putRecords();
-    }
-
     @Test
-    @Timeout(90)
-    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+    @Timeout(30)
+    public void testBasicSendReceive() throws Exception {
         String connectionBeanRef = String.format("com.mongodb.client.MongoClients#create('%s')",
                 mongoDBService.getReplicaSetUrl());
 
         CamelMongoDBPropertyFactory factory = CamelMongoDBPropertyFactory.basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConnectionBean("mongo",
                         BasicConnectorPropertyFactory.classRef(connectionBeanRef))
                 .withDatabase("testDB")
                 .withCollection("testRecords")
                 .withOperation("insert");
 
-        runTest(factory);
-
-        verifyDocuments("testDB", "testRecords");
+        runTest(factory, topicName, expect);
     }
 }