You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:46 UTC
[camel-kafka-connector] 06/14: Converted the MongoDB source test
case to use the reusable source base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6e7f97b852a81764ce71f359d19bb5f39a24fa0
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 20:00:03 2021 +0100
Converted the MongoDB source test case to use the reusable source base class
---
.../mongodb/source/CamelSourceMongoDBITCase.java | 49 +++++++++-------------
1 file changed, 19 insertions(+), 30 deletions(-)
diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
index 9260f05..5c4fa5d 100644
--- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
+++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java
@@ -26,46 +26,41 @@ import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.test.infra.mongodb.services.MongoDBService;
import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.bson.Document;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Disabled(value = "Disabled due to issue #974")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
+public class CamelSourceMongoDBITCase extends CamelSourceTestSupport {
@RegisterExtension
public static MongoDBService mongoDBService = MongoDBServiceFactory.createService();
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceMongoDBITCase.class);
-
private MongoClient mongoClient;
+ private String topicName;
private final int expect = 10;
- private int received;
@Override
protected String[] getConnectorsInTest() {
return new String[]{"camel-mongodb-kafka-connector"};
}
- @BeforeEach
- public void setUp() {
+ @BeforeAll
+ public void setUpDb() {
mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl());
MongoDatabase database = mongoClient.getDatabase("testDatabase");
@@ -95,25 +90,19 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
collection.insertMany(documents);
}
- private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
-
- return true;
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
}
- public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+ @Override
+ protected void produceTestData() {
+ // NO-OP: static data already produced on the DB setup method
+ }
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
@@ -125,13 +114,13 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest {
mongoDBService.getReplicaSetUrl());
ConnectorPropertyFactory factory = CamelMongoDBPropertyFactory.basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+ .withKafkaTopic(topicName)
.withConnectionBean("mongo",
BasicConnectorPropertyFactory.classRef(connectionBeanRef))
.withDatabase("testDatabase")
.withCollection("testCollection")
.withCreateCollection(true);
- runTest(factory);
+ runTest(factory, topicName, expect);
}
}