You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/03/17 10:02:52 UTC
[camel-spring-boot] branch main updated: [CAMEL-17801] Add tests in camel-mongodb-starter
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new 1214b25 [CAMEL-17801] Add tests in camel-mongodb-starter
1214b25 is described below
commit 1214b25a2de7449286ae32f33c4b7c991919e318
Author: Croway <fe...@gmail.com>
AuthorDate: Tue Mar 15 18:03:05 2022 +0100
[CAMEL-17801] Add tests in camel-mongodb-starter
---
components-starter/camel-mongodb-starter/pom.xml | 76 ++++
.../integration/AbstractMongoDbITSupport.java | 179 ++++++++
.../integration/MongoDbAggregateOperationIT.java | 149 +++++++
.../integration/MongoDbBigDecimalConverterIT.java | 88 ++++
.../integration/MongoDbBulkWriteOperationIT.java | 155 +++++++
.../MongoDbChangeStreamsConsumerIT.java | 178 ++++++++
.../integration/MongoDbConnectionBeansIT.java | 101 +++++
.../mongodb/integration/MongoDbConversionsIT.java | 184 ++++++++
.../MongoDbCredentialsFromUriConnectionIT.java | 131 ++++++
.../mongodb/integration/MongoDbDynamicityIT.java | 188 +++++++++
.../integration/MongoDbExceptionHandlingIT.java | 124 ++++++
.../integration/MongoDbFindOperationIT.java | 460 ++++++++++++++++++++
.../integration/MongoDbHeaderHandlingIT.java | 160 +++++++
.../mongodb/integration/MongoDbIndexIT.java | 244 +++++++++++
.../mongodb/integration/MongoDbInsertBatchIT.java | 101 +++++
.../mongodb/integration/MongoDbOperationsIT.java | 460 ++++++++++++++++++++
.../mongodb/integration/MongoDbOutputTypeIT.java | 167 ++++++++
.../integration/MongoDbReadPreferenceOptionIT.java | 99 +++++
.../integration/MongoDbSpringDslOperationsIT.java | 44 ++
.../mongodb/integration/MongoDbStopEndpointIT.java | 82 ++++
.../MongoDbTailableCursorConsumerIT.java | 463 +++++++++++++++++++++
.../meta/integration/MongoDbMetaExtensionIT.java | 174 ++++++++
.../idempotent/MongoDbIdempotentRepositoryIT.java | 127 ++++++
.../integration/MongoDbVerifierExtensionIT.java | 151 +++++++
.../src/test/resources/log4j2.properties | 33 ++
.../src/test/resources/mongodb.test.properties | 23 +
.../component/mongodb/mongoBasicOperationsTest.xml | 81 ++++
27 files changed, 4422 insertions(+)
diff --git a/components-starter/camel-mongodb-starter/pom.xml b/components-starter/camel-mongodb-starter/pom.xml
index 7909167..938e1d7 100644
--- a/components-starter/camel-mongodb-starter/pom.xml
+++ b/components-starter/camel-mongodb-starter/pom.xml
@@ -39,6 +39,20 @@
<artifactId>camel-mongodb</artifactId>
<version>${camel-version}</version>
</dependency>
+ <!-- test infra -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-mongodb</artifactId>
+ <version>${camel-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3-version}</version>
+ <scope>test</scope>
+ </dependency>
<!--START OF GENERATED CODE-->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
@@ -46,4 +60,66 @@
</dependency>
<!--END OF GENERATED CODE-->
</dependencies>
+ <profiles>
+ <!-- activate integration test if the docker socket file is accessible -->
+ <profile>
+ <id>mongodb-integration-tests-docker-file</id>
+ <activation>
+ <file>
+ <exists>/var/run/docker.sock</exists>
+ </file>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <visibleassertions.silence>true</visibleassertions.silence>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <!-- activate integration test if the DOCKER_HOST env var is set -->
+ <profile>
+ <id>mongodb-integration-tests-docker-env</id>
+ <activation>
+ <property>
+ <name>env.DOCKER_HOST</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <visibleassertions.silence>true</visibleassertions.silence>
+ </systemPropertyVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java
new file mode 100644
index 0000000..46ac9aa
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.CamelMongoDbException;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.test.infra.mongodb.services.MongoDBService;
+import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.bson.Document;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.Formatter;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public abstract class AbstractMongoDbITSupport {
+
+ protected static final String SCHEME = "mongodb";
+ protected static final String USER = "test-user";
+ protected static final String PASSWORD = "test-pwd";
+ @RegisterExtension
+ public static MongoDBService service = MongoDBServiceFactory.createService();
+ protected static MongoClient mongo;
+ protected static MongoDatabase db;
+ protected static MongoCollection<Document> testCollection;
+ protected static MongoCollection<Document> dynamicCollection;
+
+ protected static String dbName = "test";
+ protected static String testCollectionName;
+ protected static String dynamicCollectionName;
+ protected ProducerTemplate template;
+ @Autowired
+ protected CamelContext context;
+
+ @BeforeAll
+ public static void beforeAll() {
+ mongo = MongoClients.create(service.getReplicaSetUrl());
+ db = mongo.getDatabase(dbName);
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ // Refresh the test collection - drop it and recreate it. We don't do
+ // this for the database because MongoDB would create large
+ // store files each time
+ testCollectionName = "camelTest";
+ testCollection = db.getCollection(testCollectionName, Document.class);
+ testCollection.drop();
+ testCollection = db.getCollection(testCollectionName, Document.class);
+
+ dynamicCollectionName = testCollectionName.concat("Dynamic");
+ dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+ dynamicCollection.drop();
+ dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+
+ template = context.createProducerTemplate();
+
+ context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ testCollection.drop();
+ dynamicCollection.drop();
+ }
+
+ /**
+ * Useful to simulate the presence of an authenticated user with name {@value #USER} and password {@value #PASSWORD}
+ */
+ protected void createAuthorizationUser() {
+ createAuthorizationUser("admin", USER, PASSWORD);
+ }
+
+ protected void createAuthorizationUser(String database, String user, String password) {
+ MongoDatabase adminDb = mongo.getDatabase("admin");
+ MongoCollection<Document> usersCollection = adminDb.getCollection("system.users");
+ if (usersCollection.countDocuments(new Document("user", user)) == 0) {
+ MongoDatabase db = mongo.getDatabase(database);
+ Map<String, Object> commandArguments = new LinkedHashMap<>();
+ commandArguments.put("createUser", user);
+ commandArguments.put("pwd", password);
+ String[] roles = {"readWrite"};
+ commandArguments.put("roles", roles);
+ BasicDBObject command = new BasicDBObject(commandArguments);
+ db.runCommand(command);
+ }
+ }
+
+ protected void pumpDataIntoTestCollection() {
+ // there should be 100 of each
+ String[] scientists
+ = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+ for (int i = 1; i <= 1000; i++) {
+ int index = i % scientists.length;
+ Formatter f = new Formatter();
+ String doc
+ = f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index])
+ .toString();
+ IOHelper.close(f);
+ testCollection.insertOne(Document.parse(doc));
+ }
+ assertEquals(1000L, testCollection.countDocuments(), "Data pumping of 1000 entries did not complete entirely");
+ }
+
+ protected CamelMongoDbException extractAndAssertCamelMongoDbException(Object result, String message) {
+ assertTrue(result instanceof Throwable, "Result is not an Exception");
+ assertTrue(result instanceof CamelExecutionException, "Result is not an CamelExecutionException");
+ Throwable exc = ((CamelExecutionException) result).getCause();
+ assertTrue(exc instanceof CamelMongoDbException, "Result is not an CamelMongoDbException");
+ CamelMongoDbException camelExc = ObjectHelper.cast(CamelMongoDbException.class, exc);
+ if (message != null) {
+ assertTrue(camelExc.getMessage().contains(message), "CamelMongoDbException doesn't contain desired message string");
+ }
+ return camelExc;
+ }
+
+ protected MockEndpoint getMockEndpoint(String endpoint) {
+ return context.getEndpoint(endpoint, MockEndpoint.class);
+ }
+
+ @Configuration
+ public class MongoConfiguration {
+
+ @Bean
+ public MongoClient mongoClient() {
+ return mongo;
+ }
+
+ @Bean
+ public MongoDbComponent mongoDbComponent() {
+ MongoDbComponent component = new MongoDbComponent();
+ component.setMongoConnection(mongo);
+
+ return component;
+ }
+
+ @Bean
+ public void propertyLocation() {
+ context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java
new file mode 100644
index 0000000..09c37ca
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.mongodb.client.MongoIterable;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbAggregateOperationIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class,
+ MongoDbAggregateOperationIT.TestConfiguration.class
+ }
+)
+public class MongoDbAggregateOperationIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:aggregate")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+ from("direct:aggregateDBCursor")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&dynamicity=true&outputType=MongoIterable")
+ .to("mock:resultAggregateDBCursor");
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testAggregate() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ // result sorted by _id
+ Object result = template
+ .requestBody("direct:aggregate",
+ "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},"
+ + "{ $group: { _id: \"$scientist\", count: { $sum: 1 }} },{ $sort : { _id : 1}} ]");
+
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+ assertListSize("Result does not contain 2 elements", resultList, 2);
+
+ assertEquals("Darwin", resultList.get(0).get("_id"), "First result Document._id should be Darwin");
+ assertEquals(100, resultList.get(0).get("count"), "First result Document.count should be 100");
+ assertEquals("Einstein", resultList.get(1).get("_id"), "Second result Document._id should be Einstein");
+ assertEquals(100, resultList.get(1).get("count"), "Second result Document.count should be 100");
+ }
+
+ @Test
+ public void testAggregateDBCursor() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Object result = template
+ .requestBody("direct:aggregateDBCursor",
+ "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]");
+
+ assertTrue(result instanceof MongoIterable, "Result is not of type DBCursor");
+
+ MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+ // Ensure that all returned documents contain all fields
+ int count = 0;
+ for (Document document : resultCursor) {
+ assertNotNull(document.get("_id"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ count++;
+ }
+ assertEquals(200, count, "Result does not contain 200 elements");
+ }
+
+ @Test
+ public void testAggregateWithOptions() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Map<String, Object> options = new HashMap<>();
+ options.put(MongoDbConstants.BATCH_SIZE, 10);
+ options.put(MongoDbConstants.ALLOW_DISK_USE, true);
+
+ Object result = template
+ .requestBodyAndHeaders("direct:aggregateDBCursor",
+ "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]", options);
+
+ assertTrue(result instanceof MongoIterable, "Result is not of type DBCursor");
+
+ MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+
+ // Ensure that all returned documents contain all fields
+ int count = 0;
+ for (Document document : resultCursor) {
+ assertNotNull(document.get("_id"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ count++;
+ }
+ assertEquals(200, count, "Result does not contain 200 elements");
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java
new file mode 100644
index 0000000..4e42660
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.math.BigDecimal;
+
+import com.mongodb.BasicDBObject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbBigDecimalConverterIT.class,
+ MongoDbBigDecimalConverterIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbBigDecimalConverterIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:insert")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ }
+ };
+ }
+ }
+
+ private class NumberClass {
+ // CHECKSTYLE:OFF
+ public String _id = "testBigDecimalConvert";
+ // CHECKSTYLE:ON
+
+ public BigDecimal aNumber = new BigDecimal(0);
+
+ public BigDecimal bNumber = new BigDecimal(12345L);
+ }
+
+ @Test
+ public void testBigDecimalAutoConversion() {
+ assertEquals(0, testCollection.countDocuments());
+ NumberClass testClass = new NumberClass();
+ Object result = template.requestBody("direct:insert", testClass);
+ assertTrue(result instanceof Document);
+ Document b = testCollection.find(new BasicDBObject("_id", testClass._id)).first();
+ assertNotNull(b, "No record with 'testInsertString' _id");
+
+ assertEquals(new BigDecimal((double) b.get("aNumber")), testClass.aNumber);
+ assertEquals(testClass.bNumber, new BigDecimal((double) b.get("bNumber")));
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java
new file mode 100644
index 0000000..43a4a87
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.WriteModel;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbBulkWriteOperationIT.class,
+ MongoDbBulkWriteOperationIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbBulkWriteOperationIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:bulkWrite").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=bulkWrite");
+ from("direct:unorderedBulkWrite").setHeader(MongoDbConstants.BULK_ORDERED).constant(false)
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=bulkWrite");
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testBulkWrite() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+ List<WriteModel<Document>> bulkOperations = Arrays
+ .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+ new UpdateOneModel<>(
+ new Document("_id", "2"),
+ new Document("$set", new Document("scientist", "Charles Darwin"))),
+ new UpdateManyModel<>(
+ new Document("scientist", "Curie"),
+ new Document("$set", new Document("scientist", "Marie Curie"))),
+ new ReplaceOneModel<>(new Document("_id", "1"), new Document("scientist", "Albert Einstein")),
+ new DeleteOneModel<>(new Document("_id", "3")),
+ new DeleteManyModel<>(new Document("scientist", "Bohr")));
+
+ BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
+
+ assertNotNull(result);
+ // 1 insert
+ assertEquals(1, result.getInsertedCount(), "Records inserted should be 2 : ");
+ // 1 updateOne + 100 updateMany + 1 replaceOne
+ assertEquals(102, result.getMatchedCount(), "Records matched should be 102 : ");
+ assertEquals(102, result.getModifiedCount(), "Records modified should be 102 : ");
+ // 1 deleteOne + 100 deleteMany
+ assertEquals(101, result.getDeletedCount(), "Records deleted should be 101 : ");
+ }
+
+ @Test
+ public void testOrderedBulkWriteWithError() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ List<WriteModel<Document>> bulkOperations = Arrays
+ .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+ // this insert failed and bulk stop
+ new InsertOneModel<>(new Document("_id", "1")),
+ new InsertOneModel<>(new Document("scientist", "Descartes")),
+ new UpdateOneModel<>(
+ new Document("_id", "5"), new Document("$set", new Document("scientist", "Marie Curie"))),
+ new DeleteOneModel<>(new Document("_id", "2")));
+
+ try {
+ template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
+ fail("Bulk operation should throw Exception");
+ } catch (CamelExecutionException e) {
+ extractAndAssertCamelMongoDbException(e, "duplicate key error");
+ // count = 1000 records + 1 inserted
+ assertEquals(1001, testCollection.countDocuments());
+ }
+ }
+
+ @Test
+ public void testUnorderedBulkWriteWithError() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ List<WriteModel<Document>> bulkOperations = Arrays
+ .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+ // this insert failed and bulk continue
+ new InsertOneModel<>(new Document("_id", "1")),
+ new InsertOneModel<>(new Document("scientist", "Descartes")),
+ new UpdateOneModel<>(
+ new Document("_id", "5"), new Document("$set", new Document("scientist", "Marie Curie"))),
+ new DeleteOneModel<>(new Document("_id", "2")));
+ try {
+ template.requestBody("direct:unorderedBulkWrite", bulkOperations, BulkWriteResult.class);
+ fail("Bulk operation should throw Exception");
+ } catch (CamelExecutionException e) {
+ extractAndAssertCamelMongoDbException(e, "duplicate key error");
+ // count = 1000 + 2 inserted + 1 deleted
+ assertEquals(1001, testCollection.countDocuments());
+ }
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
new file mode 100644
index 0000000..e773ccd
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbChangeStreamsConsumerIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport {
+
+ private MongoCollection<Document> mongoCollection;
+ private String collectionName;
+
+ @BeforeEach
+ public void before() {
+ collectionName = "camelTest";
+ mongoCollection = db.getCollection(collectionName, Document.class);
+ mongoCollection.drop();
+
+ CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
+ db.createCollection(collectionName, collectionOptions);
+ mongoCollection = db.getCollection(collectionName, Document.class);
+ }
+
+ @AfterEach
+ public void after() {
+ getMockEndpoint("mock:test").reset();
+ }
+
+ @Test
+ public void basicTest() throws Exception {
+ assertEquals(0, mongoCollection.countDocuments());
+
+ String consumerRouteId = "simpleConsumer";
+ addTestRoutes();
+ context.getRouteController().startRoute(consumerRouteId);
+
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(10);
+
+ Thread t = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ });
+
+ t.start();
+ t.join();
+
+ mock.assertIsSatisfied();
+ context.getRouteController().stopRoute(consumerRouteId);
+ }
+
+ @Test
+ public void filterTest() throws Exception {
+ assertEquals(0, mongoCollection.countDocuments());
+
+ String consumerRouteId = "filterConsumer";
+ addTestRoutes();
+ context.getRouteController().startRoute(consumerRouteId);
+
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(1);
+
+ Thread t = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ });
+
+ t.start();
+ t.join();
+
+ mock.assertIsSatisfied();
+
+ Document actualDocument = mock.getExchanges().get(0).getIn().getBody(Document.class);
+ assertEquals("value2", actualDocument.get("string"));
+ context.getRouteController().stopRoute(consumerRouteId);
+ }
+
+ @Test
+ public void operationTypeAndIdHeaderTest() throws Exception {
+ assertEquals(0, mongoCollection.countDocuments());
+
+ String consumerRouteId = "simpleConsumer";
+ addTestRoutes();
+ context.getRouteController().startRoute(consumerRouteId);
+
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(2);
+
+ ObjectId objectId = new ObjectId();
+ Thread t = new Thread(() -> {
+ mongoCollection.insertOne(new Document("_id", objectId).append("string", "value"));
+ mongoCollection.deleteOne(new Document("_id", objectId));
+ });
+
+ t.start();
+ t.join();
+
+ mock.assertIsSatisfied();
+
+ Exchange insertExchange = mock.getExchanges().get(0);
+ assertEquals("insert", insertExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+ assertEquals(objectId, insertExchange.getIn().getHeader("_id"));
+
+ Exchange deleteExchange = mock.getExchanges().get(1);
+ Document deleteBodyDocument = deleteExchange.getIn().getBody(Document.class);
+ String deleteBody = "{\"_id\": \"" + objectId.toHexString() + "\"}";
+ assertEquals("delete", deleteExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+ assertEquals(objectId, deleteExchange.getIn().getHeader("_id"));
+ assertEquals(1, deleteBodyDocument.size());
+ assertTrue(deleteBodyDocument.containsKey("_id"));
+ assertEquals(objectId.toHexString(), deleteBodyDocument.getObjectId("_id").toHexString());
+ context.getRouteController().stopRoute(consumerRouteId);
+ }
+
+ protected void addTestRoutes() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+ .id("simpleConsumer")
+ .autoStartup(false)
+ .to("mock:test");
+
+ from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{myStreamFilter}}")
+ .id("filterConsumer")
+ .autoStartup(false)
+ .log("${body}")
+ .to("mock:test");
+ }
+ });
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java
new file mode 100644
index 0000000..c424c0a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbConnectionBeansIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbConnectionBeansIT extends AbstractMongoDbITSupport {
+ @Test
+ public void checkConnectionFromProperties() {
+ MongoClient client = MongoClients.create(service.getReplicaSetUrl());
+
+ context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+ context.getRegistry().bind("myDb", client);
+
+ MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:anyName?mongoConnection=#myDb", MongoDbEndpoint.class);
+
+ assertNotEquals("myDb", testEndpoint.getConnectionBean());
+ assertEquals(client, testEndpoint.getMongoConnection());
+ }
+
+ @Test
+ public void checkConnectionFromBean() {
+ MongoClient client = MongoClients.create(service.getReplicaSetUrl());
+
+ context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+ context.getRegistry().bind("myDb", client);
+
+ MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:myDb", MongoDbEndpoint.class);
+ assertEquals("myDb", testEndpoint.getConnectionBean());
+ assertEquals(client, testEndpoint.getMongoConnection());
+ }
+
+ @Test
+ public void checkConnectionBothExisting() {
+ MongoClient client1 = MongoClients.create(service.getReplicaSetUrl());
+ MongoClient client2 = MongoClients.create(service.getReplicaSetUrl());
+
+ context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+ context.getRegistry().bind("myDb", client1);
+ context.getRegistry().bind("myDbS", client2);
+
+ MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:myDb?mongoConnection=#myDbS", MongoDbEndpoint.class);
+ MongoClient myDbS = context.getRegistry().lookupByNameAndType("myDbS", MongoClient.class);
+
+ assertEquals("myDb", testEndpoint.getConnectionBean());
+ assertEquals(myDbS, testEndpoint.getMongoConnection());
+ }
+
+ @Test
+ public void checkMissingConnection() {
+ context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+ assertThrows(Exception.class, () -> context.getEndpoint("mongodb:anythingNotRelated", MongoDbEndpoint.class));
+ }
+
+ @Test
+ public void checkConnectionOnComponent() {
+ Endpoint endpoint = context.getEndpoint("mongodb:justARouteName");
+
+ assertIsInstanceOf(MongoDbEndpoint.class, endpoint);
+ assertEquals(mongo, ((MongoDbEndpoint) endpoint).getMongoConnection());
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java
new file mode 100644
index 0000000..ffd7b22
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.component.mongodb.converters.MongoDbBasicConverters;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbConversionsIT.class,
+ MongoDbConversionsIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbConversionsIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:insertMap")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertPojo")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertJsonString")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertJsonStringWriteResultInString")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+ .convertBodyTo(String.class);
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testInsertMap() {
+ assertEquals(0, testCollection.countDocuments());
+
+ Map<String, Object> m1 = new HashMap<>();
+ Map<String, String> m1Nested = new HashMap<>();
+
+ m1Nested.put("nested1", "nestedValue1");
+ m1Nested.put("nested2", "nestedValue2");
+
+ m1.put("field1", "value1");
+ m1.put("field2", "value2");
+ m1.put("nestedField", m1Nested);
+ m1.put(MONGO_ID, "testInsertMap");
+
+ // Object result =
+ template.requestBody("direct:insertMap", m1);
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertMap")).first();
+ assertNotNull(b, "No record with 'testInsertMap' _id");
+ }
+
+ @Test
+ public void testInsertPojo() {
+ assertEquals(0, testCollection.countDocuments());
+ // Object result =
+ template.requestBody("direct:insertPojo", new MyPojoTest());
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertPojo")).first();
+ assertNotNull(b, "No record with 'testInsertPojo' _id");
+ }
+
+ @Test
+ public void testInsertJsonString() {
+ assertEquals(0, testCollection.countDocuments());
+ // Object result =
+ template.requestBody("direct:insertJsonString",
+ "{\"fruits\": [\"apple\", \"banana\", \"papaya\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}");
+ // assertTrue(result instanceof WriteResult);
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+ assertNotNull(b, "No record with 'testInsertJsonString' _id");
+ }
+
+ @Test
+ public void testInsertJsonInputStream() throws Exception {
+ assertEquals(0, testCollection.countDocuments());
+ // Object result =
+ template.requestBody("direct:insertJsonString",
+ IOConverter.toInputStream(
+ "{\"fruits\": [\"apple\", \"banana\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}\n",
+ null));
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+ assertNotNull(b, "No record with 'testInsertJsonString' _id");
+ }
+
+ @Test
+ public void testInsertJsonInputStreamWithSpaces() throws Exception {
+ assertEquals(0, testCollection.countDocuments());
+ template.requestBody("direct:insertJsonString",
+ IOConverter.toInputStream(" {\"test\": [\"test\"], \"_id\": \"testInsertJsonStringWithSpaces\"}\n", null));
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonStringWithSpaces")).first();
+ assertNotNull(b, "No record with 'testInsertJsonStringWithSpaces' _id");
+ }
+
+ @Test
+ public void testInsertBsonInputStream() {
+ assertEquals(0, testCollection.countDocuments());
+
+ Document document = new Document(MONGO_ID, "testInsertBsonString");
+
+ // Object result =
+ template.requestBody("direct:insertJsonString", new ByteArrayInputStream(document.toJson().getBytes()));
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertBsonString")).first();
+ assertNotNull(b, "No record with 'testInsertBsonString' _id");
+ }
+
+ @SuppressWarnings("unused")
+ private class MyPojoTest {
+ public int number = 123;
+ public String text = "hello";
+ public String[] array = { "daVinci", "copernico", "einstein" };
+ // CHECKSTYLE:OFF
+ public String _id = "testInsertPojo";
+ // CHECKSTYLE:ON
+ }
+
+ @Test
+ public void shouldConvertJsonStringListToBSONList() {
+ String jsonListArray = "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]";
+ List<Bson> bsonList = MongoDbBasicConverters.fromStringToList(jsonListArray);
+ assertNotNull(bsonList);
+ assertEquals(2, bsonList.size());
+
+ String jsonEmptyArray = "[]";
+ bsonList = MongoDbBasicConverters.fromStringToList(jsonEmptyArray);
+ assertNotNull(bsonList);
+ assertEquals(0, bsonList.size());
+ }
+
+ @Test
+ public void shouldNotConvertJsonStringListToBSONList() {
+ String jsonSingleValue = "{\"key\":\"value1\"}";
+ List<Bson> bsonList = MongoDbBasicConverters.fromStringToList(jsonSingleValue);
+ assertNull(bsonList);
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java
new file mode 100644
index 0000000..be5f1a8
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+// Test class performs the same tests as DBOperationsIT but with modified URIs
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbCredentialsFromUriConnectionIT.class,
+ MongoDbCredentialsFromUriConnectionIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbCredentialsFromUriConnectionIT extends MongoDbOperationsIT {
+
+ protected static final String AUTH_SOURCE_USER = "auth-source-user";
+ protected static final String AUTH_SOURCE_PASSWORD = "auth-source-password";
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ String uriHostnameOnly = String.format("mongodb:mongo?hosts=%s&", service.getConnectionAddress());
+ //connecting with credentials for created user
+ String uriWithCredentials = String.format("%susername=%s&password=%s&", uriHostnameOnly, USER, PASSWORD);
+
+ String uriWithAuthSource = String.format(
+ "%susername=%s&password=%s&authSource=%s&",
+ uriHostnameOnly, AUTH_SOURCE_USER, AUTH_SOURCE_PASSWORD, dbName);
+
+ from("direct:count").to(
+ uriHostnameOnly + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+ from("direct:insert")
+ .to(uriWithCredentials
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:testStoreOidOnInsert")
+ .to(uriHostnameOnly
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+ .setBody()
+ .header(MongoDbConstants.OID);
+ from("direct:save")
+ .to(uriWithCredentials
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save");
+ from("direct:testStoreOidOnSave")
+ .to(uriWithCredentials
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save")
+ .setBody()
+ .header(MongoDbConstants.OID);
+ from("direct:update")
+ .to(uriWithCredentials
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update");
+ from("direct:remove")
+ .to(uriWithCredentials
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove");
+ from("direct:aggregate").to(
+ uriHostnameOnly + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+ from("direct:getDbStats").to(uriWithCredentials + "database={{mongodb.testDb}}&operation=getDbStats");
+ from("direct:getColStats").to(
+ uriWithCredentials + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
+ from("direct:command").to(uriWithCredentials + "database={{mongodb.testDb}}&operation=command");
+ from("direct:testAuthSource")
+ .to(uriWithAuthSource
+ + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+ }
+ };
+ }
+ }
+
+ @BeforeEach
+ public void before() {
+ createAuthorizationUser();
+ createAuthorizationUser(dbName, AUTH_SOURCE_USER, AUTH_SOURCE_PASSWORD);
+ }
+
+ @Test
+ public void testCountOperationAuthUser() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ Object result = template.requestBody("direct:testAuthSource", "irrelevantBody");
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(0L, result, "Test collection should not contain any records");
+
+ // Insert a record and test that the endpoint now returns 1
+ testCollection.insertOne(Document.parse("{a:60}"));
+ result = template.requestBody("direct:testAuthSource", "irrelevantBody");
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(1L, result, "Test collection should contain 1 record");
+ testCollection.deleteOne(new Document());
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java
new file mode 100644
index 0000000..eb98706
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbDynamicityIT.class,
+ MongoDbDynamicityIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbDynamicityIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:noDynamicity")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:noDynamicityExplicit").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false");
+ from("direct:dynamicityEnabled").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testInsertDynamicityDisabled() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityDisabled\", \"a\" : \"1\"}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.DATABASE, "otherDB");
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+ // Object result =
+ template.requestBodyAndHeaders("direct:noDynamicity", body, headers);
+
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityDisabled")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityDisabled' _id");
+
+ body = "{\"_id\": \"testInsertDynamicityDisabledExplicitly\", \"a\" : \"1\"}";
+ // result =
+ template.requestBodyAndHeaders("direct:noDynamicityExplicit", body, headers);
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityDisabledExplicitly")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityDisabledExplicitly' _id");
+
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledDBOnly() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledDBOnly\", \"a\" : \"1\"}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.DATABASE, "otherDB");
+ // Object result =
+ template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ MongoCollection<Document> localDynamicCollection
+ = mongo.getDatabase("otherDB").getCollection(testCollection.getNamespace().getCollectionName(), Document.class);
+
+ Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBOnly' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBOnly' _id in the test collection");
+
+ assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should exist");
+
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledCollectionOnly() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnly\", \"a\" : \"1\"}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+ // Object result =
+ template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ MongoCollection<Document> loaclDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+ Document b = loaclDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnly")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionOnly' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNull(b, "There is a record with 'testInsertDynamicityEnabledCollectionOnly' _id in the test collection");
+
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledDBAndCollection() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\"}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.DATABASE, "otherDB");
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+ // Object result =
+ template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ MongoCollection<Document> loaclDynamicCollection
+ = mongo.getDatabase("otherDB").getCollection("otherCollection", Document.class);
+
+ Document b = loaclDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBAndCollection")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBAndCollection' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+ assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should exist");
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java
new file mode 100644
index 0000000..9b3f478
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.DBObject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbExceptionHandlingIT.class,
+ MongoDbExceptionHandlingIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbExceptionHandlingIT extends AbstractMongoDbITSupport {
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:findAll").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true")
+ .to("mock:resultFindAll");
+
+ from("direct:findOneByQuery").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findOneByQuery&dynamicity=true")
+ .to("mock:resultFindOneByQuery");
+
+ from("direct:findById").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true")
+ .to("mock:resultFindById");
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testInduceParseException() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ // notice missing quote at the end of Einstein
+ try {
+ template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein}");
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ extractAndAssertCamelMongoDbException(e, null);
+ }
+ }
+
+ @Test
+ public void testInduceParseAndThenOkException() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ // notice missing quote at the end of Einstein
+ try {
+ template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein}");
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ extractAndAssertCamelMongoDbException(e, null);
+ }
+
+ // this one is okay
+ DBObject out = template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein\"}", DBObject.class);
+ assertNotNull(out);
+ assertEquals("Einstein", out.get("scientist"));
+ }
+
+ @Test
+ public void testErroneousDynamicOperation() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ try {
+ template.requestBodyAndHeader("direct:findOneByQuery", new Document("scientist", "Einstein").toJson(),
+ MongoDbConstants.OPERATION_HEADER, "dummyOp");
+ fail("Should have thrown an exception");
+ } catch (Exception e) {
+ extractAndAssertCamelMongoDbException(e, "Operation specified on header is not supported. Value: dummyOp");
+ }
+
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java
new file mode 100644
index 0000000..89e037d
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.mongodb.services.MongoDBLocalContainerService;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.lang3.ObjectUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Projections;
+
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbFindOperationIT.class,
+ MongoDbFindOperationIT.TestConfiguration.class
+ }
+)
+public class MongoDbFindOperationIT {
+
+ @RegisterExtension
+ public static MongoDBLocalContainerService service;
+
+ protected static String dbName = "test";
+ protected static String testCollectionName;
+
+ private static MongoClient mongo;
+ private static MongoDatabase db;
+ private static MongoCollection<Document> testCollection;
+ @Autowired
+ CamelContext context;
+ private ProducerTemplate template;
+
+ static {
+ // This one requires Mongo 4.4. This is related to
+ // "CAMEL-15604 support allowDiskUse for MongoDB find operations"
+ service = new MongoDBLocalContainerService("mongo:4.4");
+
+ service.getContainer()
+ .waitingFor(Wait.forListeningPort())
+ .withCommand(
+ "--replSet", "replicationName",
+ "--oplogSize", "5000",
+ "--syncdelay", "0",
+ "--noauth");
+ }
+
+ @BeforeAll
+ public static void beforeClass() {
+ mongo = MongoClients.create(service.getReplicaSetUrl());
+ db = mongo.getDatabase(dbName);
+ }
+
+ @BeforeEach
+ public void before() {
+ // Refresh the test collection - drop it and recreate it. We don't do
+ // this for the database because MongoDB would create large
+ // store files each time
+ testCollectionName = "camelTest";
+ testCollection = db.getCollection(testCollectionName, Document.class);
+ testCollection.drop();
+ testCollection = db.getCollection(testCollectionName, Document.class);
+
+ template = context.createProducerTemplate();
+ }
+
+ private MockEndpoint getMockEndpoint(String endpoint) {
+ return (MockEndpoint) context.getEndpoint(endpoint);
+ }
+
+ protected void pumpDataIntoTestCollection() {
+ // there should be 100 of each
+ String[] scientists
+ = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+ for (int i = 1; i <= 1000; i++) {
+ int index = i % scientists.length;
+ Formatter f = new Formatter();
+ String doc
+ = f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index])
+ .toString();
+ IOHelper.close(f);
+ testCollection.insertOne(Document.parse(doc));
+ }
+ assertEquals(1000L, testCollection.countDocuments(), "Data pumping of 1000 entries did not complete entirely");
+ }
+
+ @Test
+ public void testFindAllNoCriteriaOperation() {
+ // Test that the collection has 0 documents in it
+ getMockEndpoint("mock:resultFindAll").reset();
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Object result = template.requestBody("direct:findAll", ObjectUtils.NULL);
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain all entries in collection", resultList, 1000);
+
+ // Ensure that all returned documents contain all fields
+ for (Document document : resultList) {
+ assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+ // TODO: decide what to do with total count
+ // assertEquals("Result total size header should equal 1000", 1000,
+ // resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+ assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+ "Result page size header should equal 1000");
+ }
+
+ @Test
+ public void testFindAllAllowDiskUse() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Object result
+ = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, true);
+ assertTrue(result instanceof List, "Result (allowDiskUse=true) is not of type List");
+ assertListSize("Result (allowDiskUse=true) does not contain all entries in collection", (List<Document>) result, 1000);
+
+ result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, false);
+ assertTrue(result instanceof List, "Result (allowDiskUse=false) is not of type List");
+ assertListSize("Result (allowDiskUse=false) does not contain all entries in collection", (List<Document>) result,
+ 1000);
+
+ result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, null);
+ assertTrue(result instanceof List, "Result (allowDiskUse=null) is not of type List");
+ assertListSize("Result (allowDiskUse=null) does not contain all entries in collection", (List<Document>) result, 1000);
+ }
+
+ @Test
+ public void testFindAllWithQueryAndNoFIlter() {
+ // Test that the collection has 0 documents in it
+ getMockEndpoint("mock:resultFindAll").reset();
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Object result = template.requestBody("direct:findAll", eq("scientist", "Einstein"));
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain correct number of Einstein entries", resultList, 100);
+
+ // Ensure that all returned documents contain all fields, and that they
+ // only contain 'Einstein'
+ for (Document document : resultList) {
+ assertNotNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+ assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+ assertNotNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+ assertEquals("Einstein", document.get("scientist"), "Document.scientist should only be Einstein");
+ }
+
+ Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+ assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+ "Result page size header should equal 100");
+ }
+
+ @Test
+ public void testFindAllWithQueryAndFilter() {
+ // Test that the collection has 0 documents in it
+ getMockEndpoint("mock:resultFindAll").reset();
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+ Bson fieldFilter = Projections.exclude(MONGO_ID, "fixedField");
+ Bson query = eq("scientist", "Einstein");
+ Object result = template.requestBodyAndHeader("direct:findAll", query, MongoDbConstants.FIELDS_PROJECTION, fieldFilter);
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain correct number of Einstein entries", resultList, 100);
+
+ // Ensure that all returned documents contain all fields, and that they
+ // only contain 'Einstein'
+ for (Document document : resultList) {
+ assertNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+ assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+ assertNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+ assertEquals("Einstein", document.get("scientist"), "Document.scientist should only be Einstein");
+ }
+
+ Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+ assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+ "Result page size header should equal 100");
+ }
+
+ @Test
+ public void testFindAllNoCriteriaWithFilterOperation() {
+ // Test that the collection has 0 documents in it
+ getMockEndpoint("mock:resultFindAll").reset();
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Bson fieldFilter = Projections.exclude(MONGO_ID, "fixedField");
+ Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION,
+ fieldFilter);
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain all entries in collection", resultList, 1000);
+
+ // Ensure that all returned documents contain all fields
+ for (Document document : resultList) {
+ assertNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+ assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+ assertNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+ }
+
+ Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+ // assertEquals("Result total size header should equal 1000", 1000,
+ // resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+ assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+ "Result page size header should equal 1000");
+ }
+
+ @Test
+ public void testFindAllIterationOperation() {
+ // Test that the collection has 0 documents in it
+ getMockEndpoint("mock:resultFindAll").reset();
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ // Repeat ten times, obtain 10 batches of 100 results each time
+ int numToSkip = 0;
+ final int limit = 100;
+ for (int i = 0; i < 10; i++) {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.NUM_TO_SKIP, numToSkip);
+ headers.put(MongoDbConstants.LIMIT, 100);
+ Object result = template.requestBodyAndHeaders("direct:findAll", ObjectUtils.NULL, headers);
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain 100 elements", resultList, 100);
+ assertEquals(numToSkip + 1, Integer.parseInt((String) resultList.get(0).get(MONGO_ID)),
+ "Id of first record is not as expected");
+
+ // Ensure that all returned documents contain all fields
+ for (Document document : resultList) {
+ assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ numToSkip = numToSkip + limit;
+ }
+
+ for (Exchange resultExchange : getMockEndpoint("mock:resultFindAll").getReceivedExchanges()) {
+ // TODO: decide what to do with the total number of elements
+ // assertEquals("Result total size header should equal 1000", 1000,
+ // resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+ assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+ "Result page size header should equal 100");
+ }
+ }
+
+ @Test
+ public void testFindDistinctNoQuery() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Object result = template.requestBodyAndHeader("direct:findDistinct", null, MongoDbConstants.DISTINCT_QUERY_FIELD,
+ "scientist");
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<String> resultList = (List<String>) result;
+ assertEquals(10, resultList.size());
+ }
+
+ @Test
+ public void testFindDistinctWithQuery() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Bson query = eq("scientist", "Einstein");
+
+ Object result = template.requestBodyAndHeader("direct:findDistinct", query, MongoDbConstants.DISTINCT_QUERY_FIELD,
+ "scientist");
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<String> resultList = (List<String>) result;
+ assertEquals(1, resultList.size());
+
+ assertEquals("Einstein", resultList.get(0));
+ }
+
+ @Test
+ public void testFindOneByQuery() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Bson query = eq("scientist", "Einstein");
+ Document result = template.requestBody("direct:findOneByQuery", query, Document.class);
+ assertTrue(result instanceof Document, "Result is not of type Document");
+
+ assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(result.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ @Test
+ public void testFindOneById() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ Document result = template.requestBody("direct:findById", "240", Document.class);
+ assertTrue(result instanceof Document, "Result is not of type Document");
+
+ assertEquals("240", result.get(MONGO_ID), "The ID of the retrieved Document should equal 240");
+ assertEquals("Einstein", result.get("scientist"), "The scientist name of the retrieved Document should equal Einstein");
+
+ assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(result.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ @Test
+ public void testFindOneByIdWithObjectId() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ Document insertObject = new Document("scientist", "Einstein");
+ testCollection.insertOne(insertObject);
+ assertTrue(insertObject.get(MONGO_ID) instanceof ObjectId, "The ID of the inserted document should be ObjectId");
+ ObjectId id = insertObject.getObjectId(MONGO_ID);
+
+ Document result = template.requestBody("direct:findById", id, Document.class);
+ assertTrue(result instanceof Document, "Result is not of type Document");
+
+ assertTrue(result.get(MONGO_ID) instanceof ObjectId, "The ID of the retrieved Document should be ObjectId");
+ assertEquals(id, result.get(MONGO_ID), "The ID of the retrieved Document should equal to the inserted");
+ assertEquals("Einstein", result.get("scientist"), "The scientist name of the retrieved Document should equal Einstein");
+
+ assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public MongoClient mongoCLient() {
+ return mongo;
+ }
+
+ @Bean
+ public MongoDbComponent mongoDbComponent() {
+ MongoDbComponent component = new MongoDbComponent();
+ component.setMongoConnection(mongo);
+
+ return component;
+ }
+
+ @Bean
+ public void setProperties() {
+ context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+ }
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:findAll").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true")
+ .to("mock:resultFindAll");
+
+ from("direct:findOneByQuery").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findOneByQuery&dynamicity=true")
+ .to("mock:resultFindOneByQuery");
+
+ from("direct:findById").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true")
+ .to("mock:resultFindById");
+
+ from("direct:findDistinct").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findDistinct&dynamicity=true")
+ .to("mock:resultFindDistinct");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java
new file mode 100644
index 0000000..6b91f0a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.result.UpdateResult;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbHeaderHandlingIT.class,
+ MongoDbHeaderHandlingIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbHeaderHandlingIT extends AbstractMongoDbITSupport {
+
+ @Test
+ public void testInHeadersTransferredToOutOnCount() {
+ // a read operation
+ assertEquals(0, testCollection.countDocuments());
+ Exchange result = template.request("direct:count", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("irrelevant body");
+ exchange.getIn().setHeader("abc", "def");
+ }
+ });
+ assertTrue(result.getMessage().getBody() instanceof Long, "Result is not of type Long");
+ assertEquals(0L, result.getMessage().getBody(), "Test collection should not contain any records");
+ assertEquals("def", result.getMessage().getHeader("abc"), "An input header was not returned");
+ }
+
+ @Test
+ public void testInHeadersTransferredToOutOnInsert() {
+ Exchange result = template.request("direct:insert", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("{\"_id\":\"testInsertString\", \"scientist\":\"Einstein\"}");
+ exchange.getIn().setHeader("abc", "def");
+ }
+ });
+
+ // TODO: WriteResult isn't return when inserting
+ // assertTrue(result.getOut().getBody() instanceof WriteResult);
+ assertEquals("def", result.getMessage().getHeader("abc"), "An input header was not returned");
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertString")).first();
+ assertNotNull(b, "No record with 'testInsertString' _id");
+ }
+
+ @Test
+ public void testWriteResultAsHeaderWithWriteOp() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ Object[] req = new Object[] {
+ new Document(MONGO_ID, "testSave1").append("scientist", "Einstein").toJson(),
+ new Document(MONGO_ID, "testSave2").append("scientist", "Copernicus").toJson()};
+ // Object result =
+ template.requestBody("direct:insert", req);
+ // assertTrue(result instanceof WriteResult);
+ assertEquals(2, testCollection.countDocuments(), "Number of records persisted must be 2");
+
+ // Testing the save logic
+ final Document record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+ assertEquals("Einstein", record1.get("scientist"), "Scientist field of 'testSave1' must equal 'Einstein'");
+ record1.put("scientist", "Darwin");
+
+ // test that as a payload, we get back exactly our input, but enriched
+ // with the CamelMongoDbWriteResult header
+ Exchange resultExch = template.request("direct:save", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(record1);
+ }
+ });
+ assertTrue(resultExch.getMessage().getBody() instanceof Document);
+ assertEquals(record1, resultExch.getMessage().getBody());
+ assertTrue(resultExch.getMessage().getHeader(MongoDbConstants.WRITERESULT) instanceof UpdateResult);
+
+ Document record2 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+ assertEquals("Darwin", record2.get("scientist"),
+ "Scientist field of 'testSave1' must equal 'Darwin' after save operation");
+ }
+
+ @Test
+ public void testWriteResultAsHeaderWithReadOp() {
+ Exchange resultExch = template.request("direct:getDbStats", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("irrelevantBody");
+ exchange.getIn().setHeader("abc", "def");
+ }
+ });
+ assertTrue(resultExch.getMessage().getBody() instanceof Document);
+ assertNull(resultExch.getMessage().getHeader(MongoDbConstants.WRITERESULT));
+ assertEquals("def", resultExch.getMessage().getHeader("abc"));
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:count").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+ from("direct:save").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeResultAsHeader=true");
+ from("direct:getDbStats").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats&writeResultAsHeader=true");
+
+ // supporting routes
+ from("direct:insert")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java
new file mode 100644
index 0000000..cca8903
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Indexes.ascending;
+import static com.mongodb.client.model.Indexes.descending;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.ListIndexesIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbIndexIT.class,
+ MongoDbIndexIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbIndexIT extends AbstractMongoDbITSupport {
+
+ @Test
+ public void testInsertDynamicityEnabledDBAndCollectionAndIndex() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : 1, \"b\" : 2}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.DATABASE, "otherDB");
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ List<Document> objIndex = new ArrayList<>();
+ Document index1 = new Document();
+ index1.put("a", 1);
+ Document index2 = new Document();
+ index2.put("b", -1);
+ objIndex.add(index1);
+ objIndex.add(index2);
+
+ headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+ MongoCollection<Document> localDynamicCollection
+ = mongo.getDatabase("otherDB").getCollection("otherCollection", Document.class);
+
+ ListIndexesIterable<Document> indexInfos = localDynamicCollection.listIndexes(Document.class);
+
+ MongoCursor<Document> iterator = indexInfos.iterator();
+ iterator.next();
+ Document key1 = iterator.next().get("key", Document.class);
+ Document key2 = iterator.next().get("key", Document.class);
+
+ assertTrue(key1.containsKey("a") && 1 == key1.getInteger("a"), "No index on the field a");
+ assertTrue(key2.containsKey("b") && -1 == key2.getInteger("b"), "No index on the field b");
+
+ Document b = localDynamicCollection.find(new Document(MONGO_ID, "testInsertDynamicityEnabledDBAndCollection")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBAndCollection' _id");
+
+ b = testCollection.find(new Document(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+ assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should exist");
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledCollectionAndIndex() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : 1, \"b\" : 2}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ List<Bson> objIndex = Arrays.asList(ascending("a"), descending("b"));
+ headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+ MongoCollection<Document> localDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+ MongoCursor<Document> indexInfos = localDynamicCollection.listIndexes(Document.class).iterator();
+
+ indexInfos.next();
+ Document key1 = indexInfos.next().get("key", Document.class);
+ Document key2 = indexInfos.next().get("key", Document.class);
+
+ assertTrue(key1.containsKey("a") && 1 == key1.getInteger("a"), "No index on the field a");
+ assertTrue(key2.containsKey("b") && -1 == key2.getInteger("b"), "No index on the field b");
+
+ Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionAndIndex")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+ assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+ for (String db : mongo.listDatabaseNames()) {
+ assertFalse(db.contains("otherDB"), "The otherDB database should not exist");
+ }
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledCollectionOnlyAndURIIndex() {
+ assertEquals(0, testCollection.countDocuments());
+ mongo.getDatabase("otherDB").drop();
+ db.getCollection("otherCollection").drop();
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : 1, \"b\" : 2}";
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabledWithIndexUri", body, headers);
+
+ assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+ MongoCollection<Document> localDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+ MongoCursor<Document> indexInfos = localDynamicCollection.listIndexes().iterator();
+
+ Document key1 = indexInfos.next().get("key", Document.class);
+
+ assertFalse(key1.containsKey("a") && "-1".equals(key1.getString("a")), "No index on the field a");
+
+ Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnlyAndURIIndex")).first();
+ assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnlyAndURIIndex")).first();
+ assertNull(b,
+ "There is a record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id in the test collection");
+
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+ }
+
+ @Disabled
+ @Test
+ public void testInsertAutoCreateCollectionAndURIIndex() {
+ assertEquals(0, testCollection.countDocuments());
+ db.getCollection("otherCollection").deleteOne(new Document());
+
+ String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : 1, \"b\" : 2}";
+ Map<String, Object> headers = new HashMap<>();
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers);
+ assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+ MongoCollection<Document> collection = db.getCollection("otherCollection", Document.class);
+ MongoCursor<Document> indexInfos = collection.listIndexes().iterator();
+
+ Document key1 = indexInfos.next().get("key", Document.class);
+ Document key2 = indexInfos.next().get("key", Document.class);
+
+ assertTrue(key1.containsKey("b") && "-1".equals(key1.getString("b")), "No index on the field b");
+ assertTrue(key2.containsKey("a") && "1".equals(key2.getString("a")), "No index on the field a");
+
+ Document b = collection.find(eq(MONGO_ID, "testInsertAutoCreateCollectionAndURIIndex")).first();
+ assertNotNull(b, "No record with 'testInsertAutoCreateCollectionAndURIIndex' _id");
+
+ b = testCollection.find(eq(MONGO_ID, "testInsertAutoCreateCollectionAndURIIndex")).first();
+ assertNull(b, "There is a record with 'testInsertAutoCreateCollectionAndURIIndex' _id in the test collection");
+
+ assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+ "The otherDB database should not exist");
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:dynamicityEnabled").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+ from("direct:dynamicityEnabledWithIndexUri")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":1}&operation=insert&dynamicity=true");
+ from("direct:dynamicityDisabled")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":1,\"b\":-1}&operation=insert&dynamicity=false");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java
new file mode 100644
index 0000000..ec10e17
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.BasicDBObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbInsertBatchIT.class,
+ MongoDbInsertBatchIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbInsertBatchIT extends AbstractMongoDbITSupport {
+
+ @Test
+ public void testInsertBatch() {
+ assertEquals(0, testCollection.countDocuments());
+
+ Document a = new Document(MongoDbConstants.MONGO_ID, "testInsert1");
+ a.append("MyId", 1).toJson();
+ Document b = new Document(MongoDbConstants.MONGO_ID, "testInsert2");
+ b.append("MyId", 2).toJson();
+ Document c = new Document(MongoDbConstants.MONGO_ID, "testInsert3");
+ c.append("MyId", 3).toJson();
+
+ List<Document> taxGroupList = new ArrayList<>();
+ taxGroupList.add(a);
+ taxGroupList.add(b);
+ taxGroupList.add(c);
+
+ Exchange out = context.createFluentProducerTemplate()
+ .to("direct:insert").withBody(taxGroupList).send();
+
+ List oid = out.getMessage().getHeader(MongoDbConstants.OID, List.class);
+ assertNotNull(oid);
+ assertEquals(3, oid.size());
+
+ Document out1 = testCollection.find(new BasicDBObject("_id", oid.get(0))).first();
+ assertNotNull(out1);
+ assertEquals(1, out1.getInteger("MyId"));
+ Document out2 = testCollection.find(new BasicDBObject("_id", oid.get(1))).first();
+ assertNotNull(out2);
+ assertEquals(2, out2.getInteger("MyId"));
+ Document out3 = testCollection.find(new BasicDBObject("_id", oid.get(2))).first();
+ assertNotNull(out3);
+ assertEquals(3, out3.getInteger("MyId"));
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:insert")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java
new file mode 100644
index 0000000..3531df4
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.Arrays;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.component.mongodb.MongoDbOperation;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Accumulators.sum;
+import static com.mongodb.client.model.Aggregates.group;
+import static com.mongodb.client.model.Aggregates.match;
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Filters.or;
+import static com.mongodb.client.model.Updates.combine;
+import static com.mongodb.client.model.Updates.currentTimestamp;
+import static com.mongodb.client.model.Updates.set;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbOperationsIT.class,
+ MongoDbOperationsIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbOperationsIT extends AbstractMongoDbITSupport {
+
+ @Test
+ public void testCountOperation() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ Object result = template.requestBody("direct:count", "irrelevantBody");
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(0L, result, "Test collection should not contain any records");
+
+ // Insert a record and test that the endpoint now returns 1
+ testCollection.insertOne(Document.parse("{a:60}"));
+ result = template.requestBody("direct:count", "irrelevantBody");
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(1L, result, "Test collection should contain 1 record");
+ testCollection.deleteOne(new Document());
+
+ // test dynamicity
+ dynamicCollection.insertOne(Document.parse("{a:60}"));
+ result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION,
+ dynamicCollectionName);
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(1L, result, "Dynamic collection should contain 1 record");
+
+ }
+
+ @Test
+ public void testInsertString() {
+ assertEquals(0, testCollection.countDocuments());
+ Object result = template.requestBody("direct:insert",
+ new Document(MONGO_ID, "testInsertString").append("scientist", "Einstein").toJson());
+ assertTrue(result instanceof Document);
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertString")).first();
+ assertNotNull(b, "No record with 'testInsertString' _id");
+ }
+
+ @Test
+ public void testStoreOidOnInsert() {
+ Document document = new Document();
+ ObjectId oid = template.requestBody("direct:testStoreOidOnInsert", document, ObjectId.class);
+ assertEquals(document.get(MONGO_ID), oid);
+ }
+
+ @Test
+ public void testStoreOidsOnInsert() {
+ Document firsDocument = new Document();
+ Document secondDoocument = new Document();
+ List<?> oids
+ = template.requestBody("direct:testStoreOidOnInsert", Arrays.asList(firsDocument, secondDoocument), List.class);
+ assertTrue(oids.contains(firsDocument.get(MONGO_ID)));
+ assertTrue(oids.contains(secondDoocument.get(MONGO_ID)));
+ }
+
+ @Test
+ public void testSave() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ Object[] req = new Object[] {
+ new Document(MONGO_ID, "testSave1").append("scientist", "Einstein").toJson(),
+ new Document(MONGO_ID, "testSave2").append("scientist", "Copernicus").toJson() };
+ Object result = template.requestBody("direct:insert", req);
+ assertTrue(result instanceof List);
+ assertEquals(2, testCollection.countDocuments(), "Number of records persisted must be 2");
+
+ // Testing the save logic
+ Document record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+ assertEquals("Einstein", record1.get("scientist"), "Scientist field of 'testSave1' must equal 'Einstein'");
+ record1.put("scientist", "Darwin");
+
+ result = template.requestBody("direct:save", record1);
+ assertTrue(result instanceof UpdateResult);
+
+ record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+ assertEquals("Darwin", record1.get("scientist"),
+ "Scientist field of 'testSave1' must equal 'Darwin' after save operation");
+
+ }
+
+ @Test
+ public void testSaveWithoutId() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ // This document should not be modified
+ Document doc = new Document("scientist", "Copernic");
+ template.requestBody("direct:insert", doc);
+ // save (upsert) a document without Id => insert with new Id
+ doc = new Document("scientist", "Einstein");
+ assertNull(doc.get(MONGO_ID));
+ UpdateResult result = template.requestBody("direct:save", doc, UpdateResult.class);
+ assertNotNull(result.getUpsertedId());
+ // Without Id save perform an insert not an update.
+ assertEquals(0, result.getModifiedCount());
+ // Testing the save logic
+ Document record1 = testCollection.find(eq(MONGO_ID, result.getUpsertedId())).first();
+ assertEquals("Einstein", record1.get("scientist"),
+ "Scientist field of '" + result.getUpsertedId() + "' must equal 'Einstein'");
+ }
+
+ @Test
+ public void testStoreOidOnSaveWithoutId() {
+ Document document = new Document();
+ ObjectId oid = template.requestBody("direct:testStoreOidOnSave", document, ObjectId.class);
+ assertNotNull(oid);
+ }
+
+ @Test
+ public void testStoreOidOnSave() {
+ Document document = new Document(MONGO_ID, new ObjectId("5847e39e0824d6b54194e197"));
+ ObjectId oid = template.requestBody("direct:testStoreOidOnSave", document, ObjectId.class);
+ assertEquals(document.get(MONGO_ID), oid);
+ }
+
+ @Test
+ public void testUpdate() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ for (int i = 1; i <= 100; i++) {
+ String body = null;
+ try (Formatter f = new Formatter();) {
+ if (i % 2 == 0) {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+ } else {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+ }
+ f.close();
+ }
+ template.requestBody("direct:insert", body);
+ }
+ assertEquals(100L, testCollection.countDocuments());
+
+ // Testing the update logic
+ Bson extraField = eq("extraField", true);
+ assertEquals(50L, testCollection.countDocuments(extraField),
+ "Number of records with 'extraField' flag on must equal 50");
+ assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 0");
+
+ Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+
+ Exchange resultExchange = template.request("direct:update", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(new Bson[] { extraField, updateObj });
+ exchange.getIn().setHeader(MongoDbConstants.MULTIUPDATE, true);
+ }
+ });
+ Object result = resultExchange.getMessage().getBody();
+ assertTrue(result instanceof UpdateResult);
+ assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+ "Number of records updated header should equal 50");
+
+ assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+ }
+
+ @Test
+ public void testUpdateFromString() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ for (int i = 1; i <= 100; i++) {
+ String body = null;
+ try (Formatter f = new Formatter();) {
+ if (i % 2 == 0) {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+ } else {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+ }
+ f.close();
+ }
+ template.requestBody("direct:insert", body);
+ }
+ assertEquals(100L, testCollection.countDocuments());
+
+ // Testing the update logic
+ Bson extraField = eq("extraField", true);
+ assertEquals(50L, testCollection.countDocuments(extraField),
+ "Number of records with 'extraField' flag on must equal 50");
+ assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 0");
+
+ Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+
+ String updates
+ = "[" + extraField.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry()).toJson() + ","
+ + updateObj.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry()).toJson() + "]";
+
+ Exchange resultExchange = template.request("direct:update", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(updates);
+ exchange.getIn().setHeader(MongoDbConstants.MULTIUPDATE, true);
+ }
+ });
+ Object result = resultExchange.getMessage().getBody();
+ assertTrue(result instanceof UpdateResult);
+ assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+ "Number of records updated header should equal 50");
+
+ assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+ }
+
+ @Test
+ public void testUpdateUsingFieldsFilterHeader() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ for (int i = 1; i <= 100; i++) {
+ String body = null;
+ try (Formatter f = new Formatter();) {
+ if (i % 2 == 0) {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+ } else {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+ }
+ f.close();
+ }
+ template.requestBody("direct:insert", body);
+ }
+ assertEquals(100L, testCollection.countDocuments());
+
+ // Testing the update logic
+ Bson extraField = eq("extraField", true);
+ assertEquals(50L, testCollection.countDocuments(extraField),
+ "Number of records with 'extraField' flag on must equal 50");
+ assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 0");
+
+ Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+ HashMap<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.MULTIUPDATE, true);
+ headers.put(MongoDbConstants.CRITERIA, extraField);
+ Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers);
+ assertTrue(result instanceof UpdateResult);
+ assertEquals(50L, UpdateResult.class.cast(result).getModifiedCount(),
+ "Number of records updated header should equal 50");
+ assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+ "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+ }
+
+ @Test
+ public void testRemove() {
+ // Prepare test
+ assertEquals(0, testCollection.countDocuments());
+ for (int i = 1; i <= 100; i++) {
+ String body = null;
+ try (Formatter f = new Formatter()) {
+ if (i % 2 == 0) {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+ } else {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+ }
+ f.close();
+ }
+ template.requestBody("direct:insert", body);
+ }
+ assertEquals(100L, testCollection.countDocuments());
+
+ // Testing the update logic
+ Bson extraField = Filters.eq("extraField", true);
+ assertEquals(50L, testCollection.countDocuments(extraField),
+ "Number of records with 'extraField' flag on must equal 50");
+
+ Exchange resultExchange = template.request("direct:remove", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody(extraField);
+ }
+ });
+ Object result = resultExchange.getMessage().getBody();
+ assertTrue(result instanceof DeleteResult);
+ assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+ "Number of records deleted header should equal 50");
+
+ assertEquals(0, testCollection.countDocuments(extraField),
+ "Number of records with 'extraField' flag on must be 0 after remove");
+
+ }
+
+ @Test
+ public void testAggregate() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+
+ // Repeat ten times, obtain 10 batches of 100 results each time
+ List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", "Einstein"))),
+ group("$scientist", sum("count", 1)));
+ Object result = template.requestBody("direct:aggregate", aggregate);
+ assertTrue(result instanceof List, "Result is not of type List");
+
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+ assertListSize("Result does not contain 2 elements", resultList, 2);
+ // TODO Add more asserts
+ }
+
+ @Test
+ public void testDbStats() {
+ assertEquals(0, testCollection.countDocuments());
+ Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
+ assertTrue(result instanceof Document, "Result is not of type Document");
+ assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+ }
+
+ @Test
+ public void testColStats() {
+ assertEquals(0, testCollection.countDocuments());
+
+ // Add some records to the collection (and do it via camel-mongodb)
+ for (int i = 1; i <= 100; i++) {
+ String body = null;
+ try (Formatter f = new Formatter();) {
+ body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+ f.close();
+ }
+ template.requestBody("direct:insert", body);
+ }
+
+ Object result = template.requestBody("direct:getColStats", "irrelevantBody");
+ assertTrue(result instanceof Document, "Result is not of type Document");
+ assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+ }
+
+ @Test
+ public void testCommand() {
+ // Call hostInfo, command working with every configuration
+ Object result = template.requestBody("direct:command", "{\"hostInfo\":\"1\"}");
+ assertTrue(result instanceof Document, "Result is not of type Document");
+ assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+ }
+
+ @Test
+ public void testOperationHeader() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+
+ // check that the count operation was invoked instead of the insert
+ // operation
+ Object result
+ = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count");
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(0L, result, "Test collection should not contain any records");
+
+ // check that the count operation was invoked instead of the insert
+ // operation
+ result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER,
+ MongoDbOperation.count);
+ assertTrue(result instanceof Long, "Result is not of type Long");
+ assertEquals(0L, result, "Test collection should not contain any records");
+
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:count").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+ from("direct:insert")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:testStoreOidOnInsert")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+ .setBody()
+ .header(MongoDbConstants.OID);
+ from("direct:save")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save");
+ from("direct:testStoreOidOnSave")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save")
+ .setBody()
+ .header(MongoDbConstants.OID);
+ from("direct:update")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update");
+ from("direct:remove")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove");
+ from("direct:aggregate").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+ from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&operation=getDbStats");
+ from("direct:getColStats").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
+ from("direct:command").to("mongodb:myDb?database={{mongodb.testDb}}&operation=command");
+ }
+ };
+ }
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java
new file mode 100644
index 0000000..73d8201
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.commons.lang3.ObjectUtils;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.MongoIterable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbOutputTypeIT.class,
+ MongoDbOutputTypeIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbOutputTypeIT extends AbstractMongoDbITSupport {
+
+ @Test
+ public void testFindAllDBCursor() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+ // Repeat ten times, obtain 10 batches of 100 results each time
+ int numToSkip = 0;
+ final int limit = 100;
+ for (int i = 0; i < 10; i++) {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(MongoDbConstants.NUM_TO_SKIP, numToSkip);
+ headers.put(MongoDbConstants.LIMIT, 100);
+ Object result = template.requestBodyAndHeaders("direct:findAllDBCursor", ObjectUtils.NULL, headers);
+ assertTrue(result instanceof MongoIterable, "Result is not of type MongoIterable");
+
+ @SuppressWarnings("unchecked")
+ MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+ // Ensure that all returned documents contain all fields
+ for (Document document : resultCursor) {
+ assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ numToSkip = numToSkip + limit;
+ }
+ }
+
+ @Test
+ public void testFindAllDocumentList() {
+ // Test that the collection has 0 documents in it
+ assertEquals(0, testCollection.countDocuments());
+ pumpDataIntoTestCollection();
+ Object result = template.requestBody("direct:findAllDocumentList", ObjectUtils.NULL);
+ assertTrue(result instanceof List, "Result is not of type List");
+ @SuppressWarnings("unchecked")
+ List<Document> resultList = (List<Document>) result;
+
+ assertListSize("Result does not contain 1000 elements", resultList, 1000);
+
+ // Ensure that all returned documents contain all fields
+ for (Document document : resultList) {
+ assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+ assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+ assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+ }
+
+ for (Exchange resultExchange : getMockEndpoint("mock:resultFindAll").getReceivedExchanges()) {
+ assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE),
+ "Result total size header should equal 1000");
+ }
+ }
+
+ @Test
+ public void testInitFindWithWrongOutputType() {
+ try {
+ RouteBuilder taillableRouteBuilder = new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true&outputType=MongoIterable")
+ .to("mock:dummy");
+ }
+ };
+ template.getCamelContext().addRoutes(taillableRouteBuilder);
+ fail("Endpoint should not be initialized with a non compatible outputType");
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof IllegalArgumentException,
+ "Exception is not of type IllegalArgumentException");
+ }
+ }
+
+ @Test
+ public void testInitTailWithWrongOutputType() {
+ try {
+ RouteBuilder taillableRouteBuilder = new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&outputType=MongoIterable")
+ .id("tailableCursorConsumer1").autoStartup(false).to("mock:test");
+ }
+ };
+ template.getCamelContext().addRoutes(taillableRouteBuilder);
+ fail("Endpoint should not be initialized with a non compatible outputType");
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof IllegalArgumentException,
+ "Exception is not of type IllegalArgumentException");
+ }
+ }
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:findAllDBCursor")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true&outputType=MongoIterable")
+ .to("mock:resultFindAllDBCursor");
+ from("direct:findAllDocumentList").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&outputType=DocumentList")
+ .to("mock:resultFindAllDocumentList");
+ }
+ };
+ }
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java
new file mode 100644
index 0000000..9227216
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.ReadPreference;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbReadPreferenceOptionIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbReadPreferenceOptionIT extends AbstractMongoDbITSupport {
+
+ private MongoDbEndpoint endpoint;
+
+ @Test
+ public void testInvalidReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=foo");
+
+ Exception ex = assertThrows(IllegalArgumentException.class,
+ () -> endpoint.getReadPreferenceBean());
+
+ assertTrue(ex.getMessage().startsWith("No match for read preference"));
+ }
+
+ @Test
+ public void testNoReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}");
+ assertSame(ReadPreference.primary(), endpoint.getReadPreferenceBean());
+ // the default is primary
+ }
+
+ @Test
+ public void testPrimaryReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=primary");
+ assertSame(ReadPreference.primary(), endpoint.getReadPreferenceBean());
+ }
+
+ @Test
+ public void testPrimaryPreferredReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=primaryPreferred");
+ assertSame(ReadPreference.primaryPreferred(), endpoint.getReadPreferenceBean());
+ }
+
+ @Test
+ public void testSecondaryReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=secondary");
+ assertSame(ReadPreference.secondary(), endpoint.getReadPreferenceBean());
+ }
+
+ @Test
+ public void testSecondaryPreferredReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=secondaryPreferred");
+ assertSame(ReadPreference.secondaryPreferred(), endpoint.getReadPreferenceBean());
+ }
+
+ @Test
+ public void testNearestReadPreferenceOptionValue() throws Exception {
+ endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=nearest");
+ assertSame(ReadPreference.nearest(), endpoint.getReadPreferenceBean());
+ }
+
+ private MongoDbEndpoint createMongoDbEndpoint(String uri) throws Exception {
+ Endpoint mongoEndpoint = context.getComponent("mongodb").createEndpoint(uri);
+ mongoEndpoint.start();
+ return MongoDbEndpoint.class.cast(mongoEndpoint);
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java
new file mode 100644
index 0000000..8351d6f
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbSpringDslOperationsIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ },
+ properties = { "camel.springboot.routes-include-pattern=file:src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml" }
+)
+public class MongoDbSpringDslOperationsIT extends MongoDbOperationsIT {
+
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java
new file mode 100644
index 0000000..7576bb8
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbStopEndpointIT.class,
+ MongoDbStopEndpointIT.TestConfiguration.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ },
+ properties = { "mongodb.testDb=test", "mongodb.testCollection=camelTest" }
+)
+public class MongoDbStopEndpointIT extends AbstractMongoDbITSupport {
+
+ private static final String MY_ID = "myId";
+
+ String endpoint = "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert";
+
+ @Configuration
+ public class TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:insertJsonString").routeId("insert").to(endpoint);
+ from("direct:findById").routeId("find").to(
+ "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true");
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testStopEndpoint() {
+ assertEquals(0, testCollection.countDocuments());
+
+ template.requestBody("direct:insertJsonString", "{\"scientist\": \"Newton\", \"_id\": \"" + MY_ID + "\"}");
+
+ context.getEndpoint(endpoint).stop();
+
+ Document result = template.requestBody("direct:findById", MY_ID, Document.class);
+
+ assertEquals(MY_ID, result.get(MONGO_ID));
+ assertEquals("Newton", result.get("scientist"));
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java
new file mode 100644
index 0000000..4ad4b70
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbTailTrackingConfig;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+
+import java.util.Calendar;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbTailableCursorConsumerIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbTailableCursorConsumerIT extends AbstractMongoDbITSupport {
+
+ private MongoCollection<Document> cappedTestCollection;
+ private String cappedTestCollectionName = "camelTestCapped";
+
+ @Test
+ public void testThousandRecordsWithoutReadPreference() throws Exception {
+ testThousandRecordsWithRouteId("tailableCursorConsumer1");
+ }
+
+ @Test
+ public void testThousandRecordsWithReadPreference() throws Exception {
+ testThousandRecordsWithRouteId("tailableCursorConsumer1.readPreference");
+ }
+
+ @Test
+ public void testNoRecords() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ // create a capped collection with max = 1000
+ CreateCollectionOptions collectionOptions
+ = new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000);
+ db.createCollection(cappedTestCollectionName, collectionOptions);
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ assertEquals(0, cappedTestCollection.countDocuments());
+
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer1");
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(0);
+ Thread.sleep(1000);
+ mock.assertIsSatisfied();
+ context.getRouteController().stopRoute("tailableCursorConsumer1");
+ }
+
+ @Test
+ public void testMultipleBursts() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ // create a capped collection with max = 1000
+ CreateCollectionOptions createCollectionOptions
+ = new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000);
+ db.createCollection(cappedTestCollectionName, createCollectionOptions);
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer1");
+
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(5000);
+
+ // pump 5 bursts of 1000 records each with 500ms pause between burst and
+ // burst
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ if (i % 1000 == 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ }
+ });
+
+ // start the data pumping
+ t.start();
+ // before we assert, wait for the data pumping to end
+ t.join();
+
+ mock.assertIsSatisfied();
+ context.getRouteController().stopRoute("tailableCursorConsumer1");
+ }
+
+ @Test
+ public void testHundredThousandRecords() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+
+ // create a capped collection with max = 1000
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get())
+ db.createCollection(cappedTestCollectionName,
+ new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer1");
+
+ final MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(1000);
+
+ // continuous pump of 100000 records, asserting incrementally to reduce
+ // overhead on the mock endpoint
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 1; i <= 100000; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+
+ // incrementally assert, as the mock endpoint stores all
+ // messages and otherwise the test would be sluggish
+ if (i % 1000 == 0) {
+ try {
+ MongoDbTailableCursorConsumerIT.this.assertAndResetMockEndpoint(mock);
+ } catch (Exception e) {
+ return;
+ }
+ }
+ }
+ }
+ });
+
+ // start the data pumping
+ t.start();
+ // before we stop the route, wait for the data pumping to end
+ t.join();
+
+ context.getRouteController().stopRoute("tailableCursorConsumer1");
+ }
+
+ @Test
+ public void testPersistentTailTrack() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+
+ // drop the tracking collection
+ db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).drop();
+ // create a capped collection with max = 1000
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ db.createCollection(cappedTestCollectionName,
+ new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ cappedTestCollection.createIndex(new Document("increasing", 1));
+
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer2");
+
+ final MockEndpoint mock = getMockEndpoint("mock:test");
+
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 1; i <= 300; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ }
+ });
+
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+ mock.reset();
+ context.getRouteController().stopRoute("tailableCursorConsumer2");
+ while (context.getRouteController().getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) {
+ }
+ context.getRouteController().startRoute("tailableCursorConsumer2");
+
+ // expect 300 messages and not 600
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 301; i <= 600; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ }
+ });
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+
+ // check that the first message received in this second batch
+ // corresponds to increasing=301
+ Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+ assertTrue(firstBody instanceof Document);
+ assertEquals(301, Document.class.cast(firstBody).get("increasing"));
+
+ // check that the lastVal is persisted at the right time: check before
+ // and after stopping the route
+ assertEquals(300, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).find(eq("persistentId", "darwin"))
+ .first().get("lastTrackingValue"));
+ // stop the route and verify the last value has been updated
+ context.getRouteController().stopRoute("tailableCursorConsumer2");
+ while (context.getRouteController().getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) {
+ }
+ assertEquals(600, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).find(eq("persistentId", "darwin"))
+ .first().get("lastTrackingValue"));
+ }
+
+ @Test
+ public void testPersistentTailTrackIncreasingDateField() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+ final Calendar startTimestamp = Calendar.getInstance();
+
+ // get default tracking collection
+ MongoCollection<Document> trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION, Document.class);
+ trackingCol.drop();
+ trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION, Document.class);
+
+ // create a capped collection with max = 1000
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ db.createCollection(cappedTestCollectionName,
+ new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer2");
+
+ final MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 1; i <= 300; i++) {
+ Calendar c = (Calendar) (startTimestamp.clone());
+ c.add(Calendar.MINUTE, i);
+ cappedTestCollection.insertOne(new Document("increasing", c.getTime()).append("string", "value" + i));
+ }
+ }
+ });
+
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+ mock.reset();
+ // ensure that the persisted lastVal is startTimestamp + 300min
+ Calendar cal300 = (Calendar) startTimestamp.clone();
+ cal300.add(Calendar.MINUTE, 300);
+ context.getRouteController().stopRoute("tailableCursorConsumer2");
+ assertEquals(cal300.getTime(),
+ trackingCol.find(eq("persistentId", "darwin")).first().get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
+ context.getRouteController().startRoute("tailableCursorConsumer2");
+
+ // expect 300 messages and not 600
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 301; i <= 600; i++) {
+ Calendar c = (Calendar) (startTimestamp.clone());
+ c.add(Calendar.MINUTE, i);
+ cappedTestCollection.insertOne(new Document("increasing", c.getTime()).append("string", "value" + i));
+ }
+ }
+ });
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+ Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+ assertTrue(firstBody instanceof Document);
+ Calendar cal301 = Calendar.class.cast(startTimestamp.clone());
+ cal301.add(Calendar.MINUTE, 301);
+ assertEquals(cal301.getTime(), Document.class.cast(firstBody).get("increasing"));
+ // check that the persisted lastVal after stopping the route is
+ // startTimestamp + 600min
+ context.getRouteController().stopRoute("tailableCursorConsumer2");
+ Calendar cal600 = (Calendar) startTimestamp.clone();
+ cal600.add(Calendar.MINUTE, 600);
+ assertEquals(cal600.getTime(),
+ trackingCol.find(eq("persistentId", "darwin")).first().get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
+ }
+
+ @Test
+ public void testCustomTailTrackLocation() throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+
+ // get the custom tracking collection and drop it
+ // (tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton)
+ MongoCollection<Document> trackingCol = mongo.getDatabase("einstein").getCollection("curie", Document.class);
+ trackingCol.drop();
+ trackingCol = mongo.getDatabase("einstein").getCollection("curie", Document.class);
+
+ // create a capped collection with max = 1000
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ db.createCollection(cappedTestCollectionName,
+ new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ addTestRoutes();
+ context.getRouteController().startRoute("tailableCursorConsumer3");
+
+ final MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 1; i <= 300; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ }
+ });
+
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+ mock.reset();
+
+ // stop the route to ensure that our lastVal is persisted, and check it
+ context.getRouteController().stopRoute("tailableCursorConsumer3");
+ // ensure that the persisted lastVal is 300, newton is the name of the
+ // trackingField we are using
+ assertEquals(300, trackingCol.find(eq("persistentId", "darwin")).first().get("newton"));
+ context.getRouteController().startRoute("tailableCursorConsumer3");
+
+ // expect 300 messages and not 600
+ mock.expectedMessageCount(300);
+ // pump 300 records
+ t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 301; i <= 600; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ }
+ });
+ // start the data pumping
+ t.start();
+ // before we continue wait for the data pump to end
+ t.join();
+ mock.assertIsSatisfied();
+ // check that the first received body contains increasing=301 and not
+ // increasing=1, i.e. it's not starting from the top
+ Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+ assertTrue(firstBody instanceof Document);
+ assertEquals(301, (Document.class.cast(firstBody)).get("increasing"));
+ // check that the persisted lastVal after stopping the route is 600,
+ // newton is the name of the trackingField we are using
+ context.getRouteController().stopRoute("tailableCursorConsumer3");
+ assertEquals(600, trackingCol.find(eq("persistentId", "darwin")).first().get("newton"));
+ }
+
+ public void assertAndResetMockEndpoint(MockEndpoint mock) throws Exception {
+ mock.assertIsSatisfied();
+ mock.reset();
+ }
+
+ private void testThousandRecordsWithRouteId(String routeId) throws Exception {
+ assertEquals(0, cappedTestCollection.countDocuments());
+
+ // create a capped collection with max = 1000
+ // DocumentBuilder.start().add("capped", true).add("size",
+ // 1000000000).add("max", 1000).get()
+ db.createCollection(cappedTestCollectionName,
+ new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ for (int i = 0; i < 1000; i++) {
+ cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+ }
+ assertEquals(1000, cappedTestCollection.countDocuments());
+
+ addTestRoutes();
+ context.getRouteController().startRoute(routeId);
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(1000);
+ Thread.sleep(1000);
+ mock.assertIsSatisfied();
+ context.getRouteController().stopRoute(routeId);
+ }
+
+ @BeforeEach
+ public void before() {
+ // drop the capped collection and let each test create what it needs
+ cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+ cappedTestCollection.drop();
+ }
+
+ protected void addTestRoutes() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+
+ @Override
+ public void configure() {
+
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing")
+ .id("tailableCursorConsumer1")
+ .autoStartup(false).to("mock:test");
+
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin")
+ .id("tailableCursorConsumer2").autoStartup(false).to("mock:test");
+
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&"
+ + "persistentTailTracking=true&persistentId=darwin&tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton")
+ .id("tailableCursorConsumer3")
+ .autoStartup(false).to("mock:test");
+
+ from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing")// &readPreference=primary")
+ .id("tailableCursorConsumer1.readPreference").autoStartup(false).to("mock:test");
+ }
+ });
+ }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java
new file mode 100644
index 0000000..d21e466
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.meta.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.ValidationOptions;
+import org.apache.camel.component.extension.MetaDataExtension;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.integration.MongoDbDynamicityIT;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbMetaExtensionIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbMetaExtensionIT extends AbstractMongoDbITSupport {
+ // We simulate the presence of an authenticated user
+ @BeforeEach
+ public void createAuthorizationUser() {
+ super.createAuthorizationUser();
+ }
+
+ protected MongoDbComponent getComponent() {
+ return context.getComponent(SCHEME, MongoDbComponent.class);
+ }
+
+ @Test
+ public void testValidMetaData() {
+ // When
+ final String database = "test";
+ final String collection = "validatedCollection";
+ MongoDbComponent component = this.getComponent();
+ // Given
+ Document jsonSchema = Document.parse("{ \n"
+ + " bsonType: \"object\", \n"
+ + " required: [ \"name\", \"surname\", \"email\" ], \n"
+ + " properties: { \n"
+ + " name: { \n"
+ + " bsonType: \"string\", \n"
+ + " description: \"required and must be a string\" }, \n"
+ + " surname: { \n"
+ + " bsonType: \"string\", \n"
+ + " description: \"required and must be a string\" }, \n"
+ + " email: { \n"
+ + " bsonType: \"string\", \n"
+ + " pattern: \"^.+@.+$\", \n"
+ + " description: \"required and must be a valid email address\" }, \n"
+ + " year_of_birth: { \n"
+ + " bsonType: \"int\", \n"
+ + " minimum: 1900, \n"
+ + " maximum: 2018,\n"
+ + " description: \"the value must be in the range 1900-2018\" }, \n"
+ + " gender: { \n"
+ + " enum: [ \"M\", \"F\" ], \n"
+ + " description: \"can be only M or F\" } \n"
+ + " }}");
+ ValidationOptions collOptions = new ValidationOptions().validator(Filters.jsonSchema(jsonSchema));
+ AbstractMongoDbITSupport.mongo.getDatabase(database).createCollection(collection,
+ new CreateCollectionOptions().validationOptions(collOptions));
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("database", database);
+ parameters.put("collection", collection);
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+
+ MetaDataExtension.MetaData result = component.getExtension(MetaDataExtension.class).get().meta(parameters)
+ .orElseThrow(UnsupportedOperationException::new);
+ // Then
+ assertEquals("application/schema+json", result.getAttribute(MetaDataExtension.MetaData.CONTENT_TYPE));
+ assertEquals(JsonNode.class, result.getAttribute(MetaDataExtension.MetaData.JAVA_TYPE));
+ assertNotNull(result.getPayload(JsonNode.class));
+ assertNotNull(result.getPayload(JsonNode.class).get("properties"));
+ assertNotNull(result.getPayload(JsonNode.class).get("$schema"));
+ assertEquals("http://json-schema.org/schema#", result.getPayload(JsonNode.class).get("$schema").asText());
+ assertNotNull(result.getPayload(JsonNode.class).get("id"));
+ assertNotNull(result.getPayload(JsonNode.class).get("type"));
+ }
+
+ @Test
+ public void testMissingCollection() {
+ // When
+ final String database = "test";
+ final String collection = "missingCollection";
+ MongoDbComponent component = this.getComponent();
+ // Given
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("database", database);
+ parameters.put("collection", collection);
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+
+ final Optional<MetaDataExtension.MetaData> meta
+ = component.getExtension(MetaDataExtension.class).get().meta(parameters);
+
+ // Then
+ assertThrows(IllegalArgumentException.class, () -> meta.orElseThrow(IllegalArgumentException::new));
+ }
+
+ @Test
+ public void testMissingParameters() {
+ // When
+ MongoDbComponent component = this.getComponent();
+ // Given
+ Map<String, Object> parameters = new HashMap<>();
+
+ final MetaDataExtension metaDataExtension = component.getExtension(MetaDataExtension.class).get();
+
+ // Then
+ assertThrows(IllegalArgumentException.class, () -> metaDataExtension.meta(parameters));
+ }
+
+ @Test
+ public void testNotValidatedCollection() {
+ // When
+ final String database = "test";
+ final String collection = "notValidatedCollection";
+ MongoDbComponent component = this.getComponent();
+ AbstractMongoDbITSupport.mongo.getDatabase(database).createCollection(collection);
+ // Given
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("database", database);
+ parameters.put("collection", collection);
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+
+ final Optional<MetaDataExtension.MetaData> meta
+ = component.getExtension(MetaDataExtension.class).get().meta(parameters);
+
+ // Then
+ assertThrows(UnsupportedOperationException.class, () -> meta.orElseThrow(UnsupportedOperationException::new));
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java
new file mode 100644
index 0000000..24bb00a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.processor.idempotent;
+
+import java.util.UUID;
+
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.meta.integration.MongoDbMetaExtensionIT;
+import org.apache.camel.component.mongodb.processor.idempotent.MongoDbIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbIdempotentRepositoryIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbIdempotentRepositoryIT extends AbstractMongoDbITSupport {
+
+ MongoDbIdempotentRepository repo;
+
+ @BeforeEach
+ @AfterEach
+ public void clearDB() {
+ testCollection.deleteMany(new Document());
+ repo = new MongoDbIdempotentRepository(mongo, testCollectionName, dbName);
+ }
+
+ @Test
+ public void add() {
+ String randomUUIDString = UUID.randomUUID().toString();
+
+ boolean added = repo.add(randomUUIDString);
+ assertEquals(1, testCollection.countDocuments(), "Driver inserted document");
+ assertTrue(added, "Add ui returned true");
+ }
+
+ @Test
+ public void addAndContains() {
+ String randomUUIDString = UUID.randomUUID().toString();
+
+ repo.add(randomUUIDString);
+ assertEquals(1, testCollection.countDocuments());
+
+ boolean found = repo.contains(randomUUIDString);
+ assertTrue(found, "Added uid was found");
+ }
+
+ @Test
+ public void addAndRemove() {
+ String randomUUIDString = UUID.randomUUID().toString();
+
+ repo.add(randomUUIDString);
+ assertEquals(1, testCollection.countDocuments());
+
+ boolean removed = repo.remove(randomUUIDString);
+ assertTrue(removed, "Added uid was removed correctly");
+ assertEquals(0, testCollection.countDocuments());
+ }
+
+ @Test
+ public void addDuplicatedFails() {
+ String randomUUIDString = UUID.randomUUID().toString();
+
+ repo.add(randomUUIDString);
+ assertEquals(1, testCollection.countDocuments());
+
+ boolean added = repo.add(randomUUIDString);
+ assertTrue(!added, "Duplicated entry was not added");
+ assertEquals(1, testCollection.countDocuments());
+ }
+
+ @Test
+ public void deleteMissingiIsFailse() {
+ String randomUUIDString = UUID.randomUUID().toString();
+ assertEquals(0, testCollection.countDocuments());
+ boolean removed = repo.remove(randomUUIDString);
+ assertTrue(!removed, "Non exisint uid returns false");
+ }
+
+ @Test
+ public void containsMissingReturnsFalse() {
+ String randomUUIDString = UUID.randomUUID().toString();
+ boolean found = repo.contains(randomUUIDString);
+ assertTrue(!found, "Non existing item is not found");
+ }
+
+ @Test
+ public void confirmAllwaysReturnsTrue() {
+ String randomUUIDString = UUID.randomUUID().toString();
+ boolean found = repo.confirm(randomUUIDString);
+ assertTrue(found, "Confirm always returns true");
+
+ found = repo.confirm(null);
+ assertTrue(found, "Confirm always returns true, even with null");
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java
new file mode 100644
index 0000000..2e543dc
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.verifier.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.processor.idempotent.MongoDbIdempotentRepositoryIT;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ MongoDbVerifierExtensionIT.class,
+ AbstractMongoDbITSupport.MongoConfiguration.class
+ }
+)
+public class MongoDbVerifierExtensionIT extends AbstractMongoDbITSupport {
+ // We simulate the presence of an authenticated user
+ @BeforeEach
+ public void createAuthorizationUser() {
+ super.createAuthorizationUser();
+ }
+
+ protected ComponentVerifierExtension getExtension() {
+ Component component = context.getComponent(SCHEME);
+ ComponentVerifierExtension verifier
+ = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ return verifier;
+ }
+
+ @Test
+ public void verifyConnectionOK() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+ }
+
+ @Test
+ public void verifyConnectionKO() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", "notReachable.host");
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to connect"));
+ }
+
+ @Test
+ public void verifyConnectionMissingParams() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ assertTrue(result.getErrors().get(0).getDescription().startsWith("password should be set"));
+ }
+
+ @Test
+ public void verifyConnectionNotAuthenticated() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", "wrongPassword");
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to authenticate"));
+ }
+
+ @Test
+ public void verifyConnectionAdminDBKO() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", service.getConnectionAddress());
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+ parameters.put("adminDB", "someAdminDB");
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to authenticate"));
+ }
+
+ @Test
+ public void verifyConnectionPortKO() {
+ //When
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("host", "localhost:12343");
+ parameters.put("user", USER);
+ parameters.put("password", PASSWORD);
+ //Given
+ ComponentVerifierExtension.Result result
+ = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+ //Then
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to connect"));
+ }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties b/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..0689548
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties
@@ -0,0 +1,33 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-mongodb-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+logger.mongodb.name = org.apache.camel.component.mongodb.MongoDbContainer
+logger.mongodb.level = INFO
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
+#rootLogger.appenderRef.out.ref = out
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties b/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties
new file mode 100644
index 0000000..a80317c
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties
@@ -0,0 +1,23 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+mongodb.testDb=test
+mongodb.testCollection=camelTest
+mongodb.cappedTestCollection=camelTestCapped
+
+myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} }
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml b/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml
new file mode 100644
index 0000000..9f08fa0
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<routes id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:count" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true" />
+ </route>
+
+ <route>
+ <from uri="direct:insert" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert" /> <!-- &writeConcern=SAFE" /> -->
+ </route>
+
+ <route>
+ <from uri="direct:testStoreOidOnInsert" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert" /> <!-- &writeConcern=SAFE" /> -->
+ <setBody>
+ <header>CamelMongoOid</header>
+ </setBody>
+ </route>
+
+ <route>
+ <from uri="direct:save" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save" /> <!-- &writeConcern=SAFE" /> -->
+ </route>
+
+ <route>
+ <from uri="direct:testStoreOidOnSave" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save" /> <!-- &writeConcern=SAFE" /> -->
+ <setBody>
+ <header>CamelMongoOid</header>
+ </setBody>
+ </route>
+
+ <route>
+ <from uri="direct:update" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update" /> <!-- &writeConcern=SAFE" /> -->
+ </route>
+
+ <route>
+ <from uri="direct:remove" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove" /> <!-- &writeConcern=SAFE" /> -->
+ </route>
+
+ <route>
+ <from uri="direct:aggregate" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate" /> <!-- &writeConcern=SAFE" /> -->
+ </route>
+
+ <route>
+ <from uri="direct:getDbStats" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&operation=getDbStats" />
+ </route>
+
+ <route>
+ <from uri="direct:getColStats" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats" />
+ </route>
+
+ <route>
+ <from uri="direct:command" />
+ <to uri="mongodb:myDb?database={{mongodb.testDb}}&operation=command" />
+ </route>
+</routes>