You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2022/03/17 10:02:52 UTC

[camel-spring-boot] branch main updated: [CAMEL-17801] Add tests in camel-mongodb-starter

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new 1214b25  [CAMEL-17801] Add tests in camel-mongodb-starter
1214b25 is described below

commit 1214b25a2de7449286ae32f33c4b7c991919e318
Author: Croway <fe...@gmail.com>
AuthorDate: Tue Mar 15 18:03:05 2022 +0100

    [CAMEL-17801] Add tests in camel-mongodb-starter
---
 components-starter/camel-mongodb-starter/pom.xml   |  76 ++++
 .../integration/AbstractMongoDbITSupport.java      | 179 ++++++++
 .../integration/MongoDbAggregateOperationIT.java   | 149 +++++++
 .../integration/MongoDbBigDecimalConverterIT.java  |  88 ++++
 .../integration/MongoDbBulkWriteOperationIT.java   | 155 +++++++
 .../MongoDbChangeStreamsConsumerIT.java            | 178 ++++++++
 .../integration/MongoDbConnectionBeansIT.java      | 101 +++++
 .../mongodb/integration/MongoDbConversionsIT.java  | 184 ++++++++
 .../MongoDbCredentialsFromUriConnectionIT.java     | 131 ++++++
 .../mongodb/integration/MongoDbDynamicityIT.java   | 188 +++++++++
 .../integration/MongoDbExceptionHandlingIT.java    | 124 ++++++
 .../integration/MongoDbFindOperationIT.java        | 460 ++++++++++++++++++++
 .../integration/MongoDbHeaderHandlingIT.java       | 160 +++++++
 .../mongodb/integration/MongoDbIndexIT.java        | 244 +++++++++++
 .../mongodb/integration/MongoDbInsertBatchIT.java  | 101 +++++
 .../mongodb/integration/MongoDbOperationsIT.java   | 460 ++++++++++++++++++++
 .../mongodb/integration/MongoDbOutputTypeIT.java   | 167 ++++++++
 .../integration/MongoDbReadPreferenceOptionIT.java |  99 +++++
 .../integration/MongoDbSpringDslOperationsIT.java  |  44 ++
 .../mongodb/integration/MongoDbStopEndpointIT.java |  82 ++++
 .../MongoDbTailableCursorConsumerIT.java           | 463 +++++++++++++++++++++
 .../meta/integration/MongoDbMetaExtensionIT.java   | 174 ++++++++
 .../idempotent/MongoDbIdempotentRepositoryIT.java  | 127 ++++++
 .../integration/MongoDbVerifierExtensionIT.java    | 151 +++++++
 .../src/test/resources/log4j2.properties           |  33 ++
 .../src/test/resources/mongodb.test.properties     |  23 +
 .../component/mongodb/mongoBasicOperationsTest.xml |  81 ++++
 27 files changed, 4422 insertions(+)

diff --git a/components-starter/camel-mongodb-starter/pom.xml b/components-starter/camel-mongodb-starter/pom.xml
index 7909167..938e1d7 100644
--- a/components-starter/camel-mongodb-starter/pom.xml
+++ b/components-starter/camel-mongodb-starter/pom.xml
@@ -39,6 +39,20 @@
       <artifactId>camel-mongodb</artifactId>
       <version>${camel-version}</version>
     </dependency>
+    <!-- test infra -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-infra-mongodb</artifactId>
+      <version>${camel-version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3-version}</version>
+      <scope>test</scope>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.springboot</groupId>
@@ -46,4 +60,66 @@
     </dependency>
     <!--END OF GENERATED CODE-->
   </dependencies>
+  <profiles>
+    <!-- activate integration test if the docker socket file is accessible -->
+    <profile>
+      <id>mongodb-integration-tests-docker-file</id>
+      <activation>
+        <file>
+          <exists>/var/run/docker.sock</exists>
+        </file>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <configuration>
+              <systemPropertyVariables>
+                <visibleassertions.silence>true</visibleassertions.silence>
+              </systemPropertyVariables>
+            </configuration>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <!-- activate integration test if the DOCKER_HOST env var is set -->
+    <profile>
+      <id>mongodb-integration-tests-docker-env</id>
+      <activation>
+        <property>
+          <name>env.DOCKER_HOST</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <configuration>
+              <systemPropertyVariables>
+                <visibleassertions.silence>true</visibleassertions.silence>
+              </systemPropertyVariables>
+            </configuration>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java
new file mode 100644
index 0000000..46ac9aa
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/AbstractMongoDbITSupport.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.CamelMongoDbException;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.test.infra.mongodb.services.MongoDBService;
+import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.bson.Document;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+import java.util.Formatter;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public abstract class AbstractMongoDbITSupport {
+
+	protected static final String SCHEME = "mongodb";
+	protected static final String USER = "test-user";
+	protected static final String PASSWORD = "test-pwd";
+	@RegisterExtension
+	public static MongoDBService service = MongoDBServiceFactory.createService();
+	protected static MongoClient mongo;
+	protected static MongoDatabase db;
+	protected static MongoCollection<Document> testCollection;
+	protected static MongoCollection<Document> dynamicCollection;
+
+	protected static String dbName = "test";
+	protected static String testCollectionName;
+	protected static String dynamicCollectionName;
+	protected ProducerTemplate template;
+	@Autowired
+	protected CamelContext context;
+
+	@BeforeAll
+	public static void beforeAll() {
+		mongo = MongoClients.create(service.getReplicaSetUrl());
+		db = mongo.getDatabase(dbName);
+	}
+
+	@BeforeEach
+	public void beforeEach() {
+		// Refresh the test collection - drop it and recreate it. We don't do
+		// this for the database because MongoDB would create large
+		// store files each time
+		testCollectionName = "camelTest";
+		testCollection = db.getCollection(testCollectionName, Document.class);
+		testCollection.drop();
+		testCollection = db.getCollection(testCollectionName, Document.class);
+
+		dynamicCollectionName = testCollectionName.concat("Dynamic");
+		dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+		dynamicCollection.drop();
+		dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+
+		template = context.createProducerTemplate();
+
+		context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+	}
+
+	@AfterEach
+	public void tearDown() throws Exception {
+		testCollection.drop();
+		dynamicCollection.drop();
+	}
+
+	/**
+	 * Useful to simulate the presence of an authenticated user with name {@value #USER} and password {@value #PASSWORD}
+	 */
+	protected void createAuthorizationUser() {
+		createAuthorizationUser("admin", USER, PASSWORD);
+	}
+
+	protected void createAuthorizationUser(String database, String user, String password) {
+		MongoDatabase adminDb = mongo.getDatabase("admin");
+		MongoCollection<Document> usersCollection = adminDb.getCollection("system.users");
+		if (usersCollection.countDocuments(new Document("user", user)) == 0) {
+			MongoDatabase db = mongo.getDatabase(database);
+			Map<String, Object> commandArguments = new LinkedHashMap<>();
+			commandArguments.put("createUser", user);
+			commandArguments.put("pwd", password);
+			String[] roles = {"readWrite"};
+			commandArguments.put("roles", roles);
+			BasicDBObject command = new BasicDBObject(commandArguments);
+			db.runCommand(command);
+		}
+	}
+
+	protected void pumpDataIntoTestCollection() {
+		// there should be 100 of each
+		String[] scientists
+				= {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+		for (int i = 1; i <= 1000; i++) {
+			int index = i % scientists.length;
+			Formatter f = new Formatter();
+			String doc
+					= f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index])
+					.toString();
+			IOHelper.close(f);
+			testCollection.insertOne(Document.parse(doc));
+		}
+		assertEquals(1000L, testCollection.countDocuments(), "Data pumping of 1000 entries did not complete entirely");
+	}
+
+	protected CamelMongoDbException extractAndAssertCamelMongoDbException(Object result, String message) {
+		assertTrue(result instanceof Throwable, "Result is not an Exception");
+		assertTrue(result instanceof CamelExecutionException, "Result is not an CamelExecutionException");
+		Throwable exc = ((CamelExecutionException) result).getCause();
+		assertTrue(exc instanceof CamelMongoDbException, "Result is not an CamelMongoDbException");
+		CamelMongoDbException camelExc = ObjectHelper.cast(CamelMongoDbException.class, exc);
+		if (message != null) {
+			assertTrue(camelExc.getMessage().contains(message), "CamelMongoDbException doesn't contain desired message string");
+		}
+		return camelExc;
+	}
+
+	protected MockEndpoint getMockEndpoint(String endpoint) {
+		return context.getEndpoint(endpoint, MockEndpoint.class);
+	}
+
+	@Configuration
+	public class MongoConfiguration {
+
+		@Bean
+		public MongoClient mongoClient() {
+			return mongo;
+		}
+
+		@Bean
+		public MongoDbComponent mongoDbComponent() {
+			MongoDbComponent component = new MongoDbComponent();
+			component.setMongoConnection(mongo);
+
+			return component;
+		}
+
+		@Bean
+		public void propertyLocation() {
+			context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java
new file mode 100644
index 0000000..09c37ca
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbAggregateOperationIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.mongodb.client.MongoIterable;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbAggregateOperationIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class,
+                MongoDbAggregateOperationIT.TestConfiguration.class
+        }
+)
+public class MongoDbAggregateOperationIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:aggregate")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+                    from("direct:aggregateDBCursor")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&dynamicity=true&outputType=MongoIterable")
+                            .to("mock:resultAggregateDBCursor");
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testAggregate() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        // result sorted by _id
+        Object result = template
+                .requestBody("direct:aggregate",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},"
+                                                 + "{ $group: { _id: \"$scientist\", count: { $sum: 1 }} },{ $sort : { _id : 1}} ]");
+
+        assertTrue(result instanceof List, "Result is not of type List");
+
+        @SuppressWarnings("unchecked")
+        List<Document> resultList = (List<Document>) result;
+        assertListSize("Result does not contain 2 elements", resultList, 2);
+
+        assertEquals("Darwin", resultList.get(0).get("_id"), "First result Document._id should be Darwin");
+        assertEquals(100, resultList.get(0).get("count"), "First result Document.count should be 100");
+        assertEquals("Einstein", resultList.get(1).get("_id"), "Second result Document._id should be Einstein");
+        assertEquals(100, resultList.get(1).get("count"), "Second result Document.count should be 100");
+    }
+
+    @Test
+    public void testAggregateDBCursor() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        Object result = template
+                .requestBody("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]");
+
+        assertTrue(result instanceof MongoIterable, "Result is not of type DBCursor");
+
+        MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (Document document : resultCursor) {
+            assertNotNull(document.get("_id"), "Document in returned list should contain all fields");
+            assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+            assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+            count++;
+        }
+        assertEquals(200, count, "Result does not contain 200 elements");
+    }
+
+    @Test
+    public void testAggregateWithOptions() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        Map<String, Object> options = new HashMap<>();
+        options.put(MongoDbConstants.BATCH_SIZE, 10);
+        options.put(MongoDbConstants.ALLOW_DISK_USE, true);
+
+        Object result = template
+                .requestBodyAndHeaders("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]", options);
+
+        assertTrue(result instanceof MongoIterable, "Result is not of type DBCursor");
+
+        MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (Document document : resultCursor) {
+            assertNotNull(document.get("_id"), "Document in returned list should contain all fields");
+            assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+            assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+            count++;
+        }
+        assertEquals(200, count, "Result does not contain 200 elements");
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java
new file mode 100644
index 0000000..4e42660
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBigDecimalConverterIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.math.BigDecimal;
+
+import com.mongodb.BasicDBObject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbBigDecimalConverterIT.class,
+                MongoDbBigDecimalConverterIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbBigDecimalConverterIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:insert")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                }
+            };
+        }
+    }
+
+    private class NumberClass {
+        // CHECKSTYLE:OFF
+        public String _id = "testBigDecimalConvert";
+        // CHECKSTYLE:ON
+
+        public BigDecimal aNumber = new BigDecimal(0);
+
+        public BigDecimal bNumber = new BigDecimal(12345L);
+    }
+
+    @Test
+    public void testBigDecimalAutoConversion() {
+        assertEquals(0, testCollection.countDocuments());
+        NumberClass testClass = new NumberClass();
+        Object result = template.requestBody("direct:insert", testClass);
+        assertTrue(result instanceof Document);
+        Document b = testCollection.find(new BasicDBObject("_id", testClass._id)).first();
+        assertNotNull(b, "No record with 'testInsertString' _id");
+
+        assertEquals(new BigDecimal((double) b.get("aNumber")), testClass.aNumber);
+        assertEquals(testClass.bNumber, new BigDecimal((double) b.get("bNumber")));
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java
new file mode 100644
index 0000000..43a4a87
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbBulkWriteOperationIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.WriteModel;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbBulkWriteOperationIT.class,
+                MongoDbBulkWriteOperationIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbBulkWriteOperationIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:bulkWrite").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=bulkWrite");
+                    from("direct:unorderedBulkWrite").setHeader(MongoDbConstants.BULK_ORDERED).constant(false)
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=bulkWrite");
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testBulkWrite() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+        List<WriteModel<Document>> bulkOperations = Arrays
+                .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+                        new UpdateOneModel<>(
+                                new Document("_id", "2"),
+                                new Document("$set", new Document("scientist", "Charles Darwin"))),
+                        new UpdateManyModel<>(
+                                new Document("scientist", "Curie"),
+                                new Document("$set", new Document("scientist", "Marie Curie"))),
+                        new ReplaceOneModel<>(new Document("_id", "1"), new Document("scientist", "Albert Einstein")),
+                        new DeleteOneModel<>(new Document("_id", "3")),
+                        new DeleteManyModel<>(new Document("scientist", "Bohr")));
+
+        BulkWriteResult result = template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
+
+        assertNotNull(result);
+        // 1 insert
+        assertEquals(1, result.getInsertedCount(), "Records inserted should be 2 : ");
+        // 1 updateOne + 100 updateMany + 1 replaceOne
+        assertEquals(102, result.getMatchedCount(), "Records matched should be 102 : ");
+        assertEquals(102, result.getModifiedCount(), "Records modified should be 102 : ");
+        // 1 deleteOne + 100 deleteMany
+        assertEquals(101, result.getDeletedCount(), "Records deleted should be 101 : ");
+    }
+
+    @Test
+    public void testOrderedBulkWriteWithError() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        List<WriteModel<Document>> bulkOperations = Arrays
+                .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+                        // this insert failed and bulk stop
+                        new InsertOneModel<>(new Document("_id", "1")),
+                        new InsertOneModel<>(new Document("scientist", "Descartes")),
+                        new UpdateOneModel<>(
+                                new Document("_id", "5"), new Document("$set", new Document("scientist", "Marie Curie"))),
+                        new DeleteOneModel<>(new Document("_id", "2")));
+
+        try {
+            template.requestBody("direct:bulkWrite", bulkOperations, BulkWriteResult.class);
+            fail("Bulk operation should throw Exception");
+        } catch (CamelExecutionException e) {
+            extractAndAssertCamelMongoDbException(e, "duplicate key error");
+            // count = 1000 records + 1 inserted
+            assertEquals(1001, testCollection.countDocuments());
+        }
+    }
+
+    @Test
+    public void testUnorderedBulkWriteWithError() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        List<WriteModel<Document>> bulkOperations = Arrays
+                .asList(new InsertOneModel<>(new Document("scientist", "Pierre Curie")),
+                        // this insert failed and bulk continue
+                        new InsertOneModel<>(new Document("_id", "1")),
+                        new InsertOneModel<>(new Document("scientist", "Descartes")),
+                        new UpdateOneModel<>(
+                                new Document("_id", "5"), new Document("$set", new Document("scientist", "Marie Curie"))),
+                        new DeleteOneModel<>(new Document("_id", "2")));
+        try {
+            template.requestBody("direct:unorderedBulkWrite", bulkOperations, BulkWriteResult.class);
+            fail("Bulk operation should throw Exception");
+        } catch (CamelExecutionException e) {
+            extractAndAssertCamelMongoDbException(e, "duplicate key error");
+            // count = 1000 + 2 inserted + 1 deleted
+            assertEquals(1001, testCollection.countDocuments());
+        }
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
new file mode 100644
index 0000000..e773ccd
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbChangeStreamsConsumerIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbChangeStreamsConsumerIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbChangeStreamsConsumerIT extends AbstractMongoDbITSupport {
+
+    private MongoCollection<Document> mongoCollection;
+    private String collectionName;
+
+    @BeforeEach
+    public void before() {
+        collectionName = "camelTest";
+        mongoCollection = db.getCollection(collectionName, Document.class);
+        mongoCollection.drop();
+
+        CreateCollectionOptions collectionOptions = new CreateCollectionOptions();
+        db.createCollection(collectionName, collectionOptions);
+        mongoCollection = db.getCollection(collectionName, Document.class);
+    }
+
+    @AfterEach
+    public void after() {
+        getMockEndpoint("mock:test").reset();
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+
+        String consumerRouteId = "simpleConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(10);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Test
+    public void filterTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+
+        String consumerRouteId = "filterConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(1);
+
+        Thread t = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                mongoCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+            }
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+
+        Document actualDocument = mock.getExchanges().get(0).getIn().getBody(Document.class);
+        assertEquals("value2", actualDocument.get("string"));
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Test
+    public void operationTypeAndIdHeaderTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+
+        String consumerRouteId = "simpleConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(2);
+
+        ObjectId objectId = new ObjectId();
+        Thread t = new Thread(() -> {
+            mongoCollection.insertOne(new Document("_id", objectId).append("string", "value"));
+            mongoCollection.deleteOne(new Document("_id", objectId));
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
+
+        Exchange insertExchange = mock.getExchanges().get(0);
+        assertEquals("insert", insertExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+        assertEquals(objectId, insertExchange.getIn().getHeader("_id"));
+
+        Exchange deleteExchange = mock.getExchanges().get(1);
+        Document deleteBodyDocument = deleteExchange.getIn().getBody(Document.class);
+        String deleteBody = "{\"_id\": \"" + objectId.toHexString() + "\"}";
+        assertEquals("delete", deleteExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+        assertEquals(objectId, deleteExchange.getIn().getHeader("_id"));
+        assertEquals(1, deleteBodyDocument.size());
+        assertTrue(deleteBodyDocument.containsKey("_id"));
+        assertEquals(objectId.toHexString(), deleteBodyDocument.getObjectId("_id").toHexString());
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    protected void addTestRoutes() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}")
+                        .id("simpleConsumer")
+                        .autoStartup(false)
+                        .to("mock:test");
+
+                from("mongodb:myDb?consumerType=changeStreams&database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&streamFilter={{myStreamFilter}}")
+                        .id("filterConsumer")
+                        .autoStartup(false)
+                        .log("${body}")
+                        .to("mock:test");
+            }
+        });
+    }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java
new file mode 100644
index 0000000..c424c0a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConnectionBeansIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbConnectionBeansIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbConnectionBeansIT extends AbstractMongoDbITSupport {
+    @Test
+    public void checkConnectionFromProperties() {
+        MongoClient client = MongoClients.create(service.getReplicaSetUrl());
+
+        context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+        context.getRegistry().bind("myDb", client);
+
+        MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:anyName?mongoConnection=#myDb", MongoDbEndpoint.class);
+
+        assertNotEquals("myDb", testEndpoint.getConnectionBean());
+        assertEquals(client, testEndpoint.getMongoConnection());
+    }
+
+    @Test
+    public void checkConnectionFromBean() {
+        MongoClient client = MongoClients.create(service.getReplicaSetUrl());
+
+        context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+        context.getRegistry().bind("myDb", client);
+
+        MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:myDb", MongoDbEndpoint.class);
+        assertEquals("myDb", testEndpoint.getConnectionBean());
+        assertEquals(client, testEndpoint.getMongoConnection());
+    }
+
+    @Test
+    public void checkConnectionBothExisting() {
+        MongoClient client1 = MongoClients.create(service.getReplicaSetUrl());
+        MongoClient client2 = MongoClients.create(service.getReplicaSetUrl());
+
+        context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+        context.getRegistry().bind("myDb", client1);
+        context.getRegistry().bind("myDbS", client2);
+
+        MongoDbEndpoint testEndpoint = context.getEndpoint("mongodb:myDb?mongoConnection=#myDbS", MongoDbEndpoint.class);
+        MongoClient myDbS = context.getRegistry().lookupByNameAndType("myDbS", MongoClient.class);
+
+        assertEquals("myDb", testEndpoint.getConnectionBean());
+        assertEquals(myDbS, testEndpoint.getMongoConnection());
+    }
+
+    @Test
+    public void checkMissingConnection() {
+        context.getComponent(SCHEME, MongoDbComponent.class).setMongoConnection(null);
+        assertThrows(Exception.class, () -> context.getEndpoint("mongodb:anythingNotRelated", MongoDbEndpoint.class));
+    }
+
+    @Test
+    public void checkConnectionOnComponent() {
+        Endpoint endpoint = context.getEndpoint("mongodb:justARouteName");
+
+        assertIsInstanceOf(MongoDbEndpoint.class, endpoint);
+        assertEquals(mongo, ((MongoDbEndpoint) endpoint).getMongoConnection());
+    }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java
new file mode 100644
index 0000000..ffd7b22
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbConversionsIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.component.mongodb.converters.MongoDbBasicConverters;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbConversionsIT.class,
+                MongoDbConversionsIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbConversionsIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:insertMap")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:insertPojo")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:insertJsonString")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:insertJsonStringWriteResultInString")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+                            .convertBodyTo(String.class);
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testInsertMap() {
+        assertEquals(0, testCollection.countDocuments());
+
+        Map<String, Object> m1 = new HashMap<>();
+        Map<String, String> m1Nested = new HashMap<>();
+
+        m1Nested.put("nested1", "nestedValue1");
+        m1Nested.put("nested2", "nestedValue2");
+
+        m1.put("field1", "value1");
+        m1.put("field2", "value2");
+        m1.put("nestedField", m1Nested);
+        m1.put(MONGO_ID, "testInsertMap");
+
+        // Object result =
+        template.requestBody("direct:insertMap", m1);
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertMap")).first();
+        assertNotNull(b, "No record with 'testInsertMap' _id");
+    }
+
+    @Test
+    public void testInsertPojo() {
+        assertEquals(0, testCollection.countDocuments());
+        // Object result =
+        template.requestBody("direct:insertPojo", new MyPojoTest());
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertPojo")).first();
+        assertNotNull(b, "No record with 'testInsertPojo' _id");
+    }
+
+    @Test
+    public void testInsertJsonString() {
+        assertEquals(0, testCollection.countDocuments());
+        // Object result =
+        template.requestBody("direct:insertJsonString",
+                "{\"fruits\": [\"apple\", \"banana\", \"papaya\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}");
+        // assertTrue(result instanceof WriteResult);
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+        assertNotNull(b, "No record with 'testInsertJsonString' _id");
+    }
+
+    @Test
+    public void testInsertJsonInputStream() throws Exception {
+        assertEquals(0, testCollection.countDocuments());
+        // Object result =
+        template.requestBody("direct:insertJsonString",
+                IOConverter.toInputStream(
+                        "{\"fruits\": [\"apple\", \"banana\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}\n",
+                        null));
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+        assertNotNull(b, "No record with 'testInsertJsonString' _id");
+    }
+
+    @Test
+    public void testInsertJsonInputStreamWithSpaces() throws Exception {
+        assertEquals(0, testCollection.countDocuments());
+        template.requestBody("direct:insertJsonString",
+                IOConverter.toInputStream("    {\"test\": [\"test\"], \"_id\": \"testInsertJsonStringWithSpaces\"}\n", null));
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonStringWithSpaces")).first();
+        assertNotNull(b, "No record with 'testInsertJsonStringWithSpaces' _id");
+    }
+
+    @Test
+    public void testInsertBsonInputStream() {
+        assertEquals(0, testCollection.countDocuments());
+
+        Document document = new Document(MONGO_ID, "testInsertBsonString");
+
+        // Object result =
+        template.requestBody("direct:insertJsonString", new ByteArrayInputStream(document.toJson().getBytes()));
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertBsonString")).first();
+        assertNotNull(b, "No record with 'testInsertBsonString' _id");
+    }
+
+    @SuppressWarnings("unused")
+    private class MyPojoTest {
+        public int number = 123;
+        public String text = "hello";
+        public String[] array = { "daVinci", "copernico", "einstein" };
+        // CHECKSTYLE:OFF
+        public String _id = "testInsertPojo";
+        // CHECKSTYLE:ON
+    }
+
+    @Test
+    public void shouldConvertJsonStringListToBSONList() {
+        String jsonListArray = "[{\"key\":\"value1\"}, {\"key\":\"value2\"}]";
+        List<Bson> bsonList = MongoDbBasicConverters.fromStringToList(jsonListArray);
+        assertNotNull(bsonList);
+        assertEquals(2, bsonList.size());
+
+        String jsonEmptyArray = "[]";
+        bsonList = MongoDbBasicConverters.fromStringToList(jsonEmptyArray);
+        assertNotNull(bsonList);
+        assertEquals(0, bsonList.size());
+    }
+
+    @Test
+    public void shouldNotConvertJsonStringListToBSONList() {
+        String jsonSingleValue = "{\"key\":\"value1\"}";
+        List<Bson> bsonList = MongoDbBasicConverters.fromStringToList(jsonSingleValue);
+        assertNull(bsonList);
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java
new file mode 100644
index 0000000..be5f1a8
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbCredentialsFromUriConnectionIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+// Test class performs the same tests as DBOperationsIT but with modified URIs
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbCredentialsFromUriConnectionIT.class,
+                MongoDbCredentialsFromUriConnectionIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbCredentialsFromUriConnectionIT extends MongoDbOperationsIT {
+
+    protected static final String AUTH_SOURCE_USER = "auth-source-user";
+    protected static final String AUTH_SOURCE_PASSWORD = "auth-source-password";
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    String uriHostnameOnly = String.format("mongodb:mongo?hosts=%s&", service.getConnectionAddress());
+                    //connecting with credentials for created user
+                    String uriWithCredentials = String.format("%susername=%s&password=%s&", uriHostnameOnly, USER, PASSWORD);
+
+                    String uriWithAuthSource = String.format(
+                            "%susername=%s&password=%s&authSource=%s&",
+                            uriHostnameOnly, AUTH_SOURCE_USER, AUTH_SOURCE_PASSWORD, dbName);
+
+                    from("direct:count").to(
+                            uriHostnameOnly + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+                    from("direct:insert")
+                            .to(uriWithCredentials
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:testStoreOidOnInsert")
+                            .to(uriHostnameOnly
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+                            .setBody()
+                            .header(MongoDbConstants.OID);
+                    from("direct:save")
+                            .to(uriWithCredentials
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save");
+                    from("direct:testStoreOidOnSave")
+                            .to(uriWithCredentials
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save")
+                            .setBody()
+                            .header(MongoDbConstants.OID);
+                    from("direct:update")
+                            .to(uriWithCredentials
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update");
+                    from("direct:remove")
+                            .to(uriWithCredentials
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove");
+                    from("direct:aggregate").to(
+                            uriHostnameOnly + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+                    from("direct:getDbStats").to(uriWithCredentials + "database={{mongodb.testDb}}&operation=getDbStats");
+                    from("direct:getColStats").to(
+                            uriWithCredentials + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
+                    from("direct:command").to(uriWithCredentials + "database={{mongodb.testDb}}&operation=command");
+                    from("direct:testAuthSource")
+                            .to(uriWithAuthSource
+                                    + "database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+                }
+            };
+        }
+    }
+
+    @BeforeEach
+    public void before() {
+        createAuthorizationUser();
+        createAuthorizationUser(dbName, AUTH_SOURCE_USER, AUTH_SOURCE_PASSWORD);
+    }
+
+    @Test
+    public void testCountOperationAuthUser() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        Object result = template.requestBody("direct:testAuthSource", "irrelevantBody");
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(0L, result, "Test collection should not contain any records");
+
+        // Insert a record and test that the endpoint now returns 1
+        testCollection.insertOne(Document.parse("{a:60}"));
+        result = template.requestBody("direct:testAuthSource", "irrelevantBody");
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(1L, result, "Test collection should contain 1 record");
+        testCollection.deleteOne(new Document());
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java
new file mode 100644
index 0000000..eb98706
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbDynamicityIT.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbDynamicityIT.class,
+                MongoDbDynamicityIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbDynamicityIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:noDynamicity")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:noDynamicityExplicit").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false");
+                    from("direct:dynamicityEnabled").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testInsertDynamicityDisabled() {
+        assertEquals(0, testCollection.countDocuments());
+        mongo.getDatabase("otherDB").drop();
+        db.getCollection("otherCollection").drop();
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+
+        String body = "{\"_id\": \"testInsertDynamicityDisabled\", \"a\" : \"1\"}";
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(MongoDbConstants.DATABASE, "otherDB");
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+        // Object result =
+        template.requestBodyAndHeaders("direct:noDynamicity", body, headers);
+
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityDisabled")).first();
+        assertNotNull(b, "No record with 'testInsertDynamicityDisabled' _id");
+
+        body = "{\"_id\": \"testInsertDynamicityDisabledExplicitly\", \"a\" : \"1\"}";
+        // result =
+        template.requestBodyAndHeaders("direct:noDynamicityExplicit", body, headers);
+
+        b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityDisabledExplicitly")).first();
+        assertNotNull(b, "No record with 'testInsertDynamicityDisabledExplicitly' _id");
+
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+
+    }
+
+    @Test
+    public void testInsertDynamicityEnabledDBOnly() {
+        assertEquals(0, testCollection.countDocuments());
+        mongo.getDatabase("otherDB").drop();
+        db.getCollection("otherCollection").drop();
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledDBOnly\", \"a\" : \"1\"}";
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(MongoDbConstants.DATABASE, "otherDB");
+        // Object result =
+        template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+        MongoCollection<Document> localDynamicCollection
+                = mongo.getDatabase("otherDB").getCollection(testCollection.getNamespace().getCollectionName(), Document.class);
+
+        Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+        assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBOnly' _id");
+
+        b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+        assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBOnly' _id in the test collection");
+
+        assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should exist");
+
+    }
+
+    @Test
+    public void testInsertDynamicityEnabledCollectionOnly() {
+        assertEquals(0, testCollection.countDocuments());
+        mongo.getDatabase("otherDB").drop();
+        db.getCollection("otherCollection").drop();
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnly\", \"a\" : \"1\"}";
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+        // Object result =
+        template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+        MongoCollection<Document> loaclDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+        Document b = loaclDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnly")).first();
+        assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionOnly' _id");
+
+        b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+        assertNull(b, "There is a record with 'testInsertDynamicityEnabledCollectionOnly' _id in the test collection");
+
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+    }
+
+    @Test
+    public void testInsertDynamicityEnabledDBAndCollection() {
+        assertEquals(0, testCollection.countDocuments());
+        mongo.getDatabase("otherDB").drop();
+        db.getCollection("otherCollection").drop();
+        assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should not exist");
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\"}";
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(MongoDbConstants.DATABASE, "otherDB");
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+        // Object result =
+        template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+        MongoCollection<Document> loaclDynamicCollection
+                = mongo.getDatabase("otherDB").getCollection("otherCollection", Document.class);
+
+        Document b = loaclDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBAndCollection")).first();
+        assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBAndCollection' _id");
+
+        b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+        assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+        assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+                "The otherDB database should exist");
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java
new file mode 100644
index 0000000..9b3f478
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbExceptionHandlingIT.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.DBObject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbExceptionHandlingIT.class,
+                MongoDbExceptionHandlingIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbExceptionHandlingIT extends AbstractMongoDbITSupport {
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:findAll").to(
+                                    "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true")
+                            .to("mock:resultFindAll");
+
+                    from("direct:findOneByQuery").to(
+                                    "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findOneByQuery&dynamicity=true")
+                            .to("mock:resultFindOneByQuery");
+
+                    from("direct:findById").to(
+                                    "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true")
+                            .to("mock:resultFindById");
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testInduceParseException() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        // notice missing quote at the end of Einstein
+        try {
+            template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein}");
+            fail("Should have thrown an exception");
+        } catch (Exception e) {
+            extractAndAssertCamelMongoDbException(e, null);
+        }
+    }
+
+    @Test
+    public void testInduceParseAndThenOkException() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        // notice missing quote at the end of Einstein
+        try {
+            template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein}");
+            fail("Should have thrown an exception");
+        } catch (Exception e) {
+            extractAndAssertCamelMongoDbException(e, null);
+        }
+
+        // this one is okay
+        DBObject out = template.requestBody("direct:findOneByQuery", "{\"scientist\": \"Einstein\"}", DBObject.class);
+        assertNotNull(out);
+        assertEquals("Einstein", out.get("scientist"));
+    }
+
+    @Test
+    public void testErroneousDynamicOperation() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        try {
+            template.requestBodyAndHeader("direct:findOneByQuery", new Document("scientist", "Einstein").toJson(),
+                    MongoDbConstants.OPERATION_HEADER, "dummyOp");
+            fail("Should have thrown an exception");
+        } catch (Exception e) {
+            extractAndAssertCamelMongoDbException(e, "Operation specified on header is not supported. Value: dummyOp");
+        }
+
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java
new file mode 100644
index 0000000..89e037d
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbFindOperationIT.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.mongodb.services.MongoDBLocalContainerService;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.lang3.ObjectUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Projections;
+
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+                MongoDbFindOperationIT.class,
+                MongoDbFindOperationIT.TestConfiguration.class
+		}
+)
+public class MongoDbFindOperationIT {
+
+	@RegisterExtension
+	public static MongoDBLocalContainerService service;
+
+	protected static String dbName = "test";
+	protected static String testCollectionName;
+
+	private static MongoClient mongo;
+	private static MongoDatabase db;
+	private static MongoCollection<Document> testCollection;
+	@Autowired
+	CamelContext context;
+	private ProducerTemplate template;
+
+	static {
+		// This one requires Mongo 4.4. This is related to
+		// "CAMEL-15604 support allowDiskUse for MongoDB find operations"
+		service = new MongoDBLocalContainerService("mongo:4.4");
+
+		service.getContainer()
+				.waitingFor(Wait.forListeningPort())
+				.withCommand(
+						"--replSet", "replicationName",
+						"--oplogSize", "5000",
+						"--syncdelay", "0",
+						"--noauth");
+	}
+
+	@BeforeAll
+	public static void beforeClass() {
+		mongo = MongoClients.create(service.getReplicaSetUrl());
+		db = mongo.getDatabase(dbName);
+	}
+
+	@BeforeEach
+	public void before() {
+		// Refresh the test collection - drop it and recreate it. We don't do
+		// this for the database because MongoDB would create large
+		// store files each time
+		testCollectionName = "camelTest";
+		testCollection = db.getCollection(testCollectionName, Document.class);
+		testCollection.drop();
+		testCollection = db.getCollection(testCollectionName, Document.class);
+
+		template = context.createProducerTemplate();
+	}
+
+	private MockEndpoint getMockEndpoint(String endpoint) {
+		return (MockEndpoint) context.getEndpoint(endpoint);
+	}
+
+	protected void pumpDataIntoTestCollection() {
+		// there should be 100 of each
+		String[] scientists
+				= {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+		for (int i = 1; i <= 1000; i++) {
+			int index = i % scientists.length;
+			Formatter f = new Formatter();
+			String doc
+					= f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index])
+					.toString();
+			IOHelper.close(f);
+			testCollection.insertOne(Document.parse(doc));
+		}
+		assertEquals(1000L, testCollection.countDocuments(), "Data pumping of 1000 entries did not complete entirely");
+	}
+
+	@Test
+	public void testFindAllNoCriteriaOperation() {
+		// Test that the collection has 0 documents in it
+		getMockEndpoint("mock:resultFindAll").reset();
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Object result = template.requestBody("direct:findAll", ObjectUtils.NULL);
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<Document> resultList = (List<Document>) result;
+
+		assertListSize("Result does not contain all entries in collection", resultList, 1000);
+
+		// Ensure that all returned documents contain all fields
+		for (Document document : resultList) {
+			assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+			assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+			assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+		}
+
+		Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+		// TODO: decide what to do with total count
+		// assertEquals("Result total size header should equal 1000", 1000,
+		// resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+		assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+				"Result page size header should equal 1000");
+	}
+
+	@Test
+	public void testFindAllAllowDiskUse() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Object result
+				= template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, true);
+		assertTrue(result instanceof List, "Result (allowDiskUse=true) is not of type List");
+		assertListSize("Result (allowDiskUse=true) does not contain all entries in collection", (List<Document>) result, 1000);
+
+		result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, false);
+		assertTrue(result instanceof List, "Result (allowDiskUse=false) is not of type List");
+		assertListSize("Result (allowDiskUse=false) does not contain all entries in collection", (List<Document>) result,
+				1000);
+
+		result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.ALLOW_DISK_USE, null);
+		assertTrue(result instanceof List, "Result (allowDiskUse=null) is not of type List");
+		assertListSize("Result (allowDiskUse=null) does not contain all entries in collection", (List<Document>) result, 1000);
+	}
+
+	@Test
+	public void testFindAllWithQueryAndNoFIlter() {
+		// Test that the collection has 0 documents in it
+		getMockEndpoint("mock:resultFindAll").reset();
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Object result = template.requestBody("direct:findAll", eq("scientist", "Einstein"));
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<Document> resultList = (List<Document>) result;
+
+		assertListSize("Result does not contain correct number of Einstein entries", resultList, 100);
+
+		// Ensure that all returned documents contain all fields, and that they
+		// only contain 'Einstein'
+		for (Document document : resultList) {
+			assertNotNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+			assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+			assertNotNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+			assertEquals("Einstein", document.get("scientist"), "Document.scientist should only be Einstein");
+		}
+
+		Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+		assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+				"Result page size header should equal 100");
+	}
+
+	@Test
+	public void testFindAllWithQueryAndFilter() {
+		// Test that the collection has 0 documents in it
+		getMockEndpoint("mock:resultFindAll").reset();
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+		Bson fieldFilter = Projections.exclude(MONGO_ID, "fixedField");
+		Bson query = eq("scientist", "Einstein");
+		Object result = template.requestBodyAndHeader("direct:findAll", query, MongoDbConstants.FIELDS_PROJECTION, fieldFilter);
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<Document> resultList = (List<Document>) result;
+
+		assertListSize("Result does not contain correct number of Einstein entries", resultList, 100);
+
+		// Ensure that all returned documents contain all fields, and that they
+		// only contain 'Einstein'
+		for (Document document : resultList) {
+			assertNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+			assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+			assertNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+			assertEquals("Einstein", document.get("scientist"), "Document.scientist should only be Einstein");
+		}
+
+		Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+		assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+				"Result page size header should equal 100");
+	}
+
+	@Test
+	public void testFindAllNoCriteriaWithFilterOperation() {
+		// Test that the collection has 0 documents in it
+		getMockEndpoint("mock:resultFindAll").reset();
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Bson fieldFilter = Projections.exclude(MONGO_ID, "fixedField");
+		Object result = template.requestBodyAndHeader("direct:findAll", ObjectUtils.NULL, MongoDbConstants.FIELDS_PROJECTION,
+				fieldFilter);
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<Document> resultList = (List<Document>) result;
+
+		assertListSize("Result does not contain all entries in collection", resultList, 1000);
+
+		// Ensure that all returned documents contain all fields
+		for (Document document : resultList) {
+			assertNull(document.get(MONGO_ID), "Document in returned list should not contain field _id");
+			assertNotNull(document.get("scientist"), "Document in returned list does not contain field 'scientist'");
+			assertNull(document.get("fixedField"), "Document in returned list should not contain field fixedField");
+		}
+
+		Exchange resultExchange = getMockEndpoint("mock:resultFindAll").getReceivedExchanges().get(0);
+		// assertEquals("Result total size header should equal 1000", 1000,
+		// resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+		assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+				"Result page size header should equal 1000");
+	}
+
+	@Test
+	public void testFindAllIterationOperation() {
+		// Test that the collection has 0 documents in it
+		getMockEndpoint("mock:resultFindAll").reset();
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		// Repeat ten times, obtain 10 batches of 100 results each time
+		int numToSkip = 0;
+		final int limit = 100;
+		for (int i = 0; i < 10; i++) {
+			Map<String, Object> headers = new HashMap<>();
+			headers.put(MongoDbConstants.NUM_TO_SKIP, numToSkip);
+			headers.put(MongoDbConstants.LIMIT, 100);
+			Object result = template.requestBodyAndHeaders("direct:findAll", ObjectUtils.NULL, headers);
+			assertTrue(result instanceof List, "Result is not of type List");
+
+			@SuppressWarnings("unchecked")
+			List<Document> resultList = (List<Document>) result;
+
+			assertListSize("Result does not contain 100 elements", resultList, 100);
+			assertEquals(numToSkip + 1, Integer.parseInt((String) resultList.get(0).get(MONGO_ID)),
+					"Id of first record is not as expected");
+
+			// Ensure that all returned documents contain all fields
+			for (Document document : resultList) {
+				assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+				assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+				assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+			}
+
+			numToSkip = numToSkip + limit;
+		}
+
+		for (Exchange resultExchange : getMockEndpoint("mock:resultFindAll").getReceivedExchanges()) {
+			// TODO: decide what to do with the total number of elements
+			// assertEquals("Result total size header should equal 1000", 1000,
+			// resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE));
+			assertEquals(100, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_PAGE_SIZE),
+					"Result page size header should equal 100");
+		}
+	}
+
+	@Test
+	public void testFindDistinctNoQuery() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Object result = template.requestBodyAndHeader("direct:findDistinct", null, MongoDbConstants.DISTINCT_QUERY_FIELD,
+				"scientist");
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<String> resultList = (List<String>) result;
+		assertEquals(10, resultList.size());
+	}
+
+	@Test
+	public void testFindDistinctWithQuery() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Bson query = eq("scientist", "Einstein");
+
+		Object result = template.requestBodyAndHeader("direct:findDistinct", query, MongoDbConstants.DISTINCT_QUERY_FIELD,
+				"scientist");
+		assertTrue(result instanceof List, "Result is not of type List");
+
+		@SuppressWarnings("unchecked")
+		List<String> resultList = (List<String>) result;
+		assertEquals(1, resultList.size());
+
+		assertEquals("Einstein", resultList.get(0));
+	}
+
+	@Test
+	public void testFindOneByQuery() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Bson query = eq("scientist", "Einstein");
+		Document result = template.requestBody("direct:findOneByQuery", query, Document.class);
+		assertTrue(result instanceof Document, "Result is not of type Document");
+
+		assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+		assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+		assertNotNull(result.get("fixedField"), "Document in returned list should contain all fields");
+	}
+
+	@Test
+	public void testFindOneById() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+
+		Document result = template.requestBody("direct:findById", "240", Document.class);
+		assertTrue(result instanceof Document, "Result is not of type Document");
+
+		assertEquals("240", result.get(MONGO_ID), "The ID of the retrieved Document should equal 240");
+		assertEquals("Einstein", result.get("scientist"), "The scientist name of the retrieved Document should equal Einstein");
+
+		assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+		assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+		assertNotNull(result.get("fixedField"), "Document in returned list should contain all fields");
+	}
+
+	@Test
+	public void testFindOneByIdWithObjectId() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		Document insertObject = new Document("scientist", "Einstein");
+		testCollection.insertOne(insertObject);
+		assertTrue(insertObject.get(MONGO_ID) instanceof ObjectId, "The ID of the inserted document should be ObjectId");
+		ObjectId id = insertObject.getObjectId(MONGO_ID);
+
+		Document result = template.requestBody("direct:findById", id, Document.class);
+		assertTrue(result instanceof Document, "Result is not of type Document");
+
+		assertTrue(result.get(MONGO_ID) instanceof ObjectId, "The ID of the retrieved Document should be ObjectId");
+		assertEquals(id, result.get(MONGO_ID), "The ID of the retrieved Document should equal to the inserted");
+		assertEquals("Einstein", result.get("scientist"), "The scientist name of the retrieved Document should equal Einstein");
+
+		assertNotNull(result.get(MONGO_ID), "Document in returned list should contain all fields");
+		assertNotNull(result.get("scientist"), "Document in returned list should contain all fields");
+	}
+
+	@Configuration
+	public class TestConfiguration {
+
+		@Bean
+		public MongoClient mongoCLient() {
+			return mongo;
+		}
+
+		@Bean
+		public MongoDbComponent mongoDbComponent() {
+			MongoDbComponent component = new MongoDbComponent();
+			component.setMongoConnection(mongo);
+
+			return component;
+		}
+
+		@Bean
+		public void setProperties() {
+			context.getPropertiesComponent().setLocation("classpath:mongodb.test.properties");
+		}
+
+		@Bean
+		public RouteBuilder routeBuilder() {
+			return new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("direct:findAll").to(
+									"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true")
+							.to("mock:resultFindAll");
+
+					from("direct:findOneByQuery").to(
+									"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findOneByQuery&dynamicity=true")
+							.to("mock:resultFindOneByQuery");
+
+					from("direct:findById").to(
+									"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true")
+							.to("mock:resultFindById");
+
+					from("direct:findDistinct").to(
+									"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findDistinct&dynamicity=true")
+							.to("mock:resultFindDistinct");
+				}
+			};
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java
new file mode 100644
index 0000000..6b91f0a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbHeaderHandlingIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.result.UpdateResult;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+				MongoDbHeaderHandlingIT.class,
+				MongoDbHeaderHandlingIT.TestConfiguration.class,
+				AbstractMongoDbITSupport.MongoConfiguration.class
+		}
+)
+public class MongoDbHeaderHandlingIT extends AbstractMongoDbITSupport {
+
+	@Test
+	public void testInHeadersTransferredToOutOnCount() {
+		// a read operation
+		assertEquals(0, testCollection.countDocuments());
+		Exchange result = template.request("direct:count", new Processor() {
+			@Override
+			public void process(Exchange exchange) {
+				exchange.getIn().setBody("irrelevant body");
+				exchange.getIn().setHeader("abc", "def");
+			}
+		});
+		assertTrue(result.getMessage().getBody() instanceof Long, "Result is not of type Long");
+		assertEquals(0L, result.getMessage().getBody(), "Test collection should not contain any records");
+		assertEquals("def", result.getMessage().getHeader("abc"), "An input header was not returned");
+	}
+
+	@Test
+	public void testInHeadersTransferredToOutOnInsert() {
+		Exchange result = template.request("direct:insert", new Processor() {
+			@Override
+			public void process(Exchange exchange) {
+				exchange.getIn().setBody("{\"_id\":\"testInsertString\", \"scientist\":\"Einstein\"}");
+				exchange.getIn().setHeader("abc", "def");
+			}
+		});
+
+		// TODO: WriteResult isn't return when inserting
+		// assertTrue(result.getOut().getBody() instanceof WriteResult);
+		assertEquals("def", result.getMessage().getHeader("abc"), "An input header was not returned");
+		Document b = testCollection.find(eq(MONGO_ID, "testInsertString")).first();
+		assertNotNull(b, "No record with 'testInsertString' _id");
+	}
+
+	@Test
+	public void testWriteResultAsHeaderWithWriteOp() {
+		// Prepare test
+		assertEquals(0, testCollection.countDocuments());
+		Object[] req = new Object[] {
+				new Document(MONGO_ID, "testSave1").append("scientist", "Einstein").toJson(),
+				new Document(MONGO_ID, "testSave2").append("scientist", "Copernicus").toJson()};
+		// Object result =
+		template.requestBody("direct:insert", req);
+		// assertTrue(result instanceof WriteResult);
+		assertEquals(2, testCollection.countDocuments(), "Number of records persisted must be 2");
+
+		// Testing the save logic
+		final Document record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+		assertEquals("Einstein", record1.get("scientist"), "Scientist field of 'testSave1' must equal 'Einstein'");
+		record1.put("scientist", "Darwin");
+
+		// test that as a payload, we get back exactly our input, but enriched
+		// with the CamelMongoDbWriteResult header
+		Exchange resultExch = template.request("direct:save", new Processor() {
+			@Override
+			public void process(Exchange exchange) {
+				exchange.getIn().setBody(record1);
+			}
+		});
+		assertTrue(resultExch.getMessage().getBody() instanceof Document);
+		assertEquals(record1, resultExch.getMessage().getBody());
+		assertTrue(resultExch.getMessage().getHeader(MongoDbConstants.WRITERESULT) instanceof UpdateResult);
+
+		Document record2 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+		assertEquals("Darwin", record2.get("scientist"),
+				"Scientist field of 'testSave1' must equal 'Darwin' after save operation");
+	}
+
+	@Test
+	public void testWriteResultAsHeaderWithReadOp() {
+		Exchange resultExch = template.request("direct:getDbStats", new Processor() {
+			@Override
+			public void process(Exchange exchange) {
+				exchange.getIn().setBody("irrelevantBody");
+				exchange.getIn().setHeader("abc", "def");
+			}
+		});
+		assertTrue(resultExch.getMessage().getBody() instanceof Document);
+		assertNull(resultExch.getMessage().getHeader(MongoDbConstants.WRITERESULT));
+		assertEquals("def", resultExch.getMessage().getHeader("abc"));
+	}
+
+	@Configuration
+	public class TestConfiguration {
+
+		@Bean
+		public RouteBuilder routeBuilder() {
+			return new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("direct:count").to(
+							"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+					from("direct:save").to(
+							"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeResultAsHeader=true");
+					from("direct:getDbStats").to(
+							"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats&writeResultAsHeader=true");
+
+					// supporting routes
+					from("direct:insert")
+							.to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+				}
+			};
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java
new file mode 100644
index 0000000..cca8903
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbIndexIT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Indexes.ascending;
+import static com.mongodb.client.model.Indexes.descending;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.ListIndexesIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+				MongoDbIndexIT.class,
+				MongoDbIndexIT.TestConfiguration.class,
+				AbstractMongoDbITSupport.MongoConfiguration.class
+		}
+)
+public class MongoDbIndexIT extends AbstractMongoDbITSupport {
+
+	@Test
+	public void testInsertDynamicityEnabledDBAndCollectionAndIndex() {
+		assertEquals(0, testCollection.countDocuments());
+		mongo.getDatabase("otherDB").drop();
+		db.getCollection("otherCollection").drop();
+		assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should not exist");
+
+		String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : 1, \"b\" : 2}";
+		Map<String, Object> headers = new HashMap<>();
+		headers.put(MongoDbConstants.DATABASE, "otherDB");
+		headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+		List<Document> objIndex = new ArrayList<>();
+		Document index1 = new Document();
+		index1.put("a", 1);
+		Document index2 = new Document();
+		index2.put("b", -1);
+		objIndex.add(index1);
+		objIndex.add(index2);
+
+		headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+		Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+		assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+		MongoCollection<Document> localDynamicCollection
+				= mongo.getDatabase("otherDB").getCollection("otherCollection", Document.class);
+
+		ListIndexesIterable<Document> indexInfos = localDynamicCollection.listIndexes(Document.class);
+
+		MongoCursor<Document> iterator = indexInfos.iterator();
+		iterator.next();
+		Document key1 = iterator.next().get("key", Document.class);
+		Document key2 = iterator.next().get("key", Document.class);
+
+		assertTrue(key1.containsKey("a") && 1 == key1.getInteger("a"), "No index on the field a");
+		assertTrue(key2.containsKey("b") && -1 == key2.getInteger("b"), "No index on the field b");
+
+		Document b = localDynamicCollection.find(new Document(MONGO_ID, "testInsertDynamicityEnabledDBAndCollection")).first();
+		assertNotNull(b, "No record with 'testInsertDynamicityEnabledDBAndCollection' _id");
+
+		b = testCollection.find(new Document(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+		assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+		assertTrue(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should exist");
+	}
+
+	@Test
+	public void testInsertDynamicityEnabledCollectionAndIndex() {
+		assertEquals(0, testCollection.countDocuments());
+		mongo.getDatabase("otherDB").drop();
+		db.getCollection("otherCollection").drop();
+		assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should not exist");
+
+		String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : 1, \"b\" : 2}";
+		Map<String, Object> headers = new HashMap<>();
+		headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+		List<Bson> objIndex = Arrays.asList(ascending("a"), descending("b"));
+		headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+		Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+		assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+		MongoCollection<Document> localDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+		MongoCursor<Document> indexInfos = localDynamicCollection.listIndexes(Document.class).iterator();
+
+		indexInfos.next();
+		Document key1 = indexInfos.next().get("key", Document.class);
+		Document key2 = indexInfos.next().get("key", Document.class);
+
+		assertTrue(key1.containsKey("a") && 1 == key1.getInteger("a"), "No index on the field a");
+		assertTrue(key2.containsKey("b") && -1 == key2.getInteger("b"), "No index on the field b");
+
+		Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionAndIndex")).first();
+		assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id");
+
+		b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledDBOnly")).first();
+		assertNull(b, "There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection");
+
+		for (String db : mongo.listDatabaseNames()) {
+			assertFalse(db.contains("otherDB"), "The otherDB database should not exist");
+		}
+	}
+
+	@Test
+	public void testInsertDynamicityEnabledCollectionOnlyAndURIIndex() {
+		assertEquals(0, testCollection.countDocuments());
+		mongo.getDatabase("otherDB").drop();
+		db.getCollection("otherCollection").drop();
+		assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should not exist");
+
+		String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : 1, \"b\" : 2}";
+		Map<String, Object> headers = new HashMap<>();
+		headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+		Object result = template.requestBodyAndHeaders("direct:dynamicityEnabledWithIndexUri", body, headers);
+
+		assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+		MongoCollection<Document> localDynamicCollection = db.getCollection("otherCollection", Document.class);
+
+		MongoCursor<Document> indexInfos = localDynamicCollection.listIndexes().iterator();
+
+		Document key1 = indexInfos.next().get("key", Document.class);
+
+		assertFalse(key1.containsKey("a") && "-1".equals(key1.getString("a")), "No index on the field a");
+
+		Document b = localDynamicCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnlyAndURIIndex")).first();
+		assertNotNull(b, "No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id");
+
+		b = testCollection.find(eq(MONGO_ID, "testInsertDynamicityEnabledCollectionOnlyAndURIIndex")).first();
+		assertNull(b,
+				"There is a record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id in the test collection");
+
+		assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should not exist");
+	}
+
+	@Disabled
+	@Test
+	public void testInsertAutoCreateCollectionAndURIIndex() {
+		assertEquals(0, testCollection.countDocuments());
+		db.getCollection("otherCollection").deleteOne(new Document());
+
+		String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : 1, \"b\" : 2}";
+		Map<String, Object> headers = new HashMap<>();
+
+		Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers);
+		assertEquals(Document.class, result.getClass(), "Response isn't of type WriteResult");
+
+		MongoCollection<Document> collection = db.getCollection("otherCollection", Document.class);
+		MongoCursor<Document> indexInfos = collection.listIndexes().iterator();
+
+		Document key1 = indexInfos.next().get("key", Document.class);
+		Document key2 = indexInfos.next().get("key", Document.class);
+
+		assertTrue(key1.containsKey("b") && "-1".equals(key1.getString("b")), "No index on the field b");
+		assertTrue(key2.containsKey("a") && "1".equals(key2.getString("a")), "No index on the field a");
+
+		Document b = collection.find(eq(MONGO_ID, "testInsertAutoCreateCollectionAndURIIndex")).first();
+		assertNotNull(b, "No record with 'testInsertAutoCreateCollectionAndURIIndex' _id");
+
+		b = testCollection.find(eq(MONGO_ID, "testInsertAutoCreateCollectionAndURIIndex")).first();
+		assertNull(b, "There is a record with 'testInsertAutoCreateCollectionAndURIIndex' _id in the test collection");
+
+		assertFalse(StreamSupport.stream(mongo.listDatabaseNames().spliterator(), false).anyMatch("otherDB"::equals),
+				"The otherDB database should not exist");
+	}
+
+	@Configuration
+	public class TestConfiguration {
+
+		@Bean
+		public RouteBuilder routeBuilder() {
+			return new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("direct:dynamicityEnabled").to(
+							"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+					from("direct:dynamicityEnabledWithIndexUri")
+							.to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":1}&operation=insert&dynamicity=true");
+					from("direct:dynamicityDisabled")
+							.to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":1,\"b\":-1}&operation=insert&dynamicity=false");
+				}
+			};
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java
new file mode 100644
index 0000000..ec10e17
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbInsertBatchIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.BasicDBObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+				MongoDbInsertBatchIT.class,
+				MongoDbInsertBatchIT.TestConfiguration.class,
+				AbstractMongoDbITSupport.MongoConfiguration.class
+		}
+)
+public class MongoDbInsertBatchIT extends AbstractMongoDbITSupport {
+
+	@Test
+	public void testInsertBatch() {
+		assertEquals(0, testCollection.countDocuments());
+
+		Document a = new Document(MongoDbConstants.MONGO_ID, "testInsert1");
+		a.append("MyId", 1).toJson();
+		Document b = new Document(MongoDbConstants.MONGO_ID, "testInsert2");
+		b.append("MyId", 2).toJson();
+		Document c = new Document(MongoDbConstants.MONGO_ID, "testInsert3");
+		c.append("MyId", 3).toJson();
+
+		List<Document> taxGroupList = new ArrayList<>();
+		taxGroupList.add(a);
+		taxGroupList.add(b);
+		taxGroupList.add(c);
+
+		Exchange out = context.createFluentProducerTemplate()
+				.to("direct:insert").withBody(taxGroupList).send();
+
+		List oid = out.getMessage().getHeader(MongoDbConstants.OID, List.class);
+		assertNotNull(oid);
+		assertEquals(3, oid.size());
+
+		Document out1 = testCollection.find(new BasicDBObject("_id", oid.get(0))).first();
+		assertNotNull(out1);
+		assertEquals(1, out1.getInteger("MyId"));
+		Document out2 = testCollection.find(new BasicDBObject("_id", oid.get(1))).first();
+		assertNotNull(out2);
+		assertEquals(2, out2.getInteger("MyId"));
+		Document out3 = testCollection.find(new BasicDBObject("_id", oid.get(2))).first();
+		assertNotNull(out3);
+		assertEquals(3, out3.getInteger("MyId"));
+	}
+
+	@Configuration
+	public class TestConfiguration {
+
+		@Bean
+		public RouteBuilder routeBuilder() {
+			return new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("direct:insert")
+							.to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+				}
+			};
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java
new file mode 100644
index 0000000..3531df4
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOperationsIT.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import java.util.Arrays;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.component.mongodb.MongoDbOperation;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static com.mongodb.client.model.Accumulators.sum;
+import static com.mongodb.client.model.Aggregates.group;
+import static com.mongodb.client.model.Aggregates.match;
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Filters.or;
+import static com.mongodb.client.model.Updates.combine;
+import static com.mongodb.client.model.Updates.currentTimestamp;
+import static com.mongodb.client.model.Updates.set;
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbOperationsIT.class,
+                MongoDbOperationsIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbOperationsIT extends AbstractMongoDbITSupport {
+
+    @Test
+    public void testCountOperation() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        Object result = template.requestBody("direct:count", "irrelevantBody");
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(0L, result, "Test collection should not contain any records");
+
+        // Insert a record and test that the endpoint now returns 1
+        testCollection.insertOne(Document.parse("{a:60}"));
+        result = template.requestBody("direct:count", "irrelevantBody");
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(1L, result, "Test collection should contain 1 record");
+        testCollection.deleteOne(new Document());
+
+        // test dynamicity
+        dynamicCollection.insertOne(Document.parse("{a:60}"));
+        result = template.requestBodyAndHeader("direct:count", "irrelevantBody", MongoDbConstants.COLLECTION,
+                dynamicCollectionName);
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(1L, result, "Dynamic collection should contain 1 record");
+
+    }
+
+    @Test
+    public void testInsertString() {
+        assertEquals(0, testCollection.countDocuments());
+        Object result = template.requestBody("direct:insert",
+                new Document(MONGO_ID, "testInsertString").append("scientist", "Einstein").toJson());
+        assertTrue(result instanceof Document);
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertString")).first();
+        assertNotNull(b, "No record with 'testInsertString' _id");
+    }
+
+    @Test
+    public void testStoreOidOnInsert() {
+        Document document = new Document();
+        ObjectId oid = template.requestBody("direct:testStoreOidOnInsert", document, ObjectId.class);
+        assertEquals(document.get(MONGO_ID), oid);
+    }
+
+    @Test
+    public void testStoreOidsOnInsert() {
+        Document firsDocument = new Document();
+        Document secondDoocument = new Document();
+        List<?> oids
+                = template.requestBody("direct:testStoreOidOnInsert", Arrays.asList(firsDocument, secondDoocument), List.class);
+        assertTrue(oids.contains(firsDocument.get(MONGO_ID)));
+        assertTrue(oids.contains(secondDoocument.get(MONGO_ID)));
+    }
+
+    @Test
+    public void testSave() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        Object[] req = new Object[] {
+                new Document(MONGO_ID, "testSave1").append("scientist", "Einstein").toJson(),
+                new Document(MONGO_ID, "testSave2").append("scientist", "Copernicus").toJson() };
+        Object result = template.requestBody("direct:insert", req);
+        assertTrue(result instanceof List);
+        assertEquals(2, testCollection.countDocuments(), "Number of records persisted must be 2");
+
+        // Testing the save logic
+        Document record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+        assertEquals("Einstein", record1.get("scientist"), "Scientist field of 'testSave1' must equal 'Einstein'");
+        record1.put("scientist", "Darwin");
+
+        result = template.requestBody("direct:save", record1);
+        assertTrue(result instanceof UpdateResult);
+
+        record1 = testCollection.find(eq(MONGO_ID, "testSave1")).first();
+        assertEquals("Darwin", record1.get("scientist"),
+                "Scientist field of 'testSave1' must equal 'Darwin' after save operation");
+
+    }
+
+    @Test
+    public void testSaveWithoutId() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        // This document should not be modified
+        Document doc = new Document("scientist", "Copernic");
+        template.requestBody("direct:insert", doc);
+        // save (upsert) a document without Id => insert with new Id
+        doc = new Document("scientist", "Einstein");
+        assertNull(doc.get(MONGO_ID));
+        UpdateResult result = template.requestBody("direct:save", doc, UpdateResult.class);
+        assertNotNull(result.getUpsertedId());
+        // Without Id save perform an insert not an update.
+        assertEquals(0, result.getModifiedCount());
+        // Testing the save logic
+        Document record1 = testCollection.find(eq(MONGO_ID, result.getUpsertedId())).first();
+        assertEquals("Einstein", record1.get("scientist"),
+                "Scientist field of '" + result.getUpsertedId() + "' must equal 'Einstein'");
+    }
+
+    @Test
+    public void testStoreOidOnSaveWithoutId() {
+        Document document = new Document();
+        ObjectId oid = template.requestBody("direct:testStoreOidOnSave", document, ObjectId.class);
+        assertNotNull(oid);
+    }
+
+    @Test
+    public void testStoreOidOnSave() {
+        Document document = new Document(MONGO_ID, new ObjectId("5847e39e0824d6b54194e197"));
+        ObjectId oid = template.requestBody("direct:testStoreOidOnSave", document, ObjectId.class);
+        assertEquals(document.get(MONGO_ID), oid);
+    }
+
+    @Test
+    public void testUpdate() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        for (int i = 1; i <= 100; i++) {
+            String body = null;
+            try (Formatter f = new Formatter();) {
+                if (i % 2 == 0) {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+                } else {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+                }
+                f.close();
+            }
+            template.requestBody("direct:insert", body);
+        }
+        assertEquals(100L, testCollection.countDocuments());
+
+        // Testing the update logic
+        Bson extraField = eq("extraField", true);
+        assertEquals(50L, testCollection.countDocuments(extraField),
+                "Number of records with 'extraField' flag on must equal 50");
+        assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 0");
+
+        Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+
+        Exchange resultExchange = template.request("direct:update", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(new Bson[] { extraField, updateObj });
+                exchange.getIn().setHeader(MongoDbConstants.MULTIUPDATE, true);
+            }
+        });
+        Object result = resultExchange.getMessage().getBody();
+        assertTrue(result instanceof UpdateResult);
+        assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+                "Number of records updated header should equal 50");
+
+        assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+    }
+
+    @Test
+    public void testUpdateFromString() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        for (int i = 1; i <= 100; i++) {
+            String body = null;
+            try (Formatter f = new Formatter();) {
+                if (i % 2 == 0) {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+                } else {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+                }
+                f.close();
+            }
+            template.requestBody("direct:insert", body);
+        }
+        assertEquals(100L, testCollection.countDocuments());
+
+        // Testing the update logic
+        Bson extraField = eq("extraField", true);
+        assertEquals(50L, testCollection.countDocuments(extraField),
+                "Number of records with 'extraField' flag on must equal 50");
+        assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 0");
+
+        Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+
+        String updates
+                = "[" + extraField.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry()).toJson() + ","
+                  + updateObj.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry()).toJson() + "]";
+
+        Exchange resultExchange = template.request("direct:update", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(updates);
+                exchange.getIn().setHeader(MongoDbConstants.MULTIUPDATE, true);
+            }
+        });
+        Object result = resultExchange.getMessage().getBody();
+        assertTrue(result instanceof UpdateResult);
+        assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+                "Number of records updated header should equal 50");
+
+        assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+    }
+
+    @Test
+    public void testUpdateUsingFieldsFilterHeader() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        for (int i = 1; i <= 100; i++) {
+            String body = null;
+            try (Formatter f = new Formatter();) {
+                if (i % 2 == 0) {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+                } else {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+                }
+                f.close();
+            }
+            template.requestBody("direct:insert", body);
+        }
+        assertEquals(100L, testCollection.countDocuments());
+
+        // Testing the update logic
+        Bson extraField = eq("extraField", true);
+        assertEquals(50L, testCollection.countDocuments(extraField),
+                "Number of records with 'extraField' flag on must equal 50");
+        assertEquals(0, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 0");
+
+        Bson updateObj = combine(set("scientist", "Darwin"), currentTimestamp("lastModified"));
+        HashMap<String, Object> headers = new HashMap<>();
+        headers.put(MongoDbConstants.MULTIUPDATE, true);
+        headers.put(MongoDbConstants.CRITERIA, extraField);
+        Object result = template.requestBodyAndHeaders("direct:update", updateObj, headers);
+        assertTrue(result instanceof UpdateResult);
+        assertEquals(50L, UpdateResult.class.cast(result).getModifiedCount(),
+                "Number of records updated header should equal 50");
+        assertEquals(50, testCollection.countDocuments(new Document("scientist", "Darwin")),
+                "Number of records with 'scientist' field = Darwin on must equal 50 after update");
+    }
+
+    @Test
+    public void testRemove() {
+        // Prepare test
+        assertEquals(0, testCollection.countDocuments());
+        for (int i = 1; i <= 100; i++) {
+            String body = null;
+            try (Formatter f = new Formatter()) {
+                if (i % 2 == 0) {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+                } else {
+                    body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
+                }
+                f.close();
+            }
+            template.requestBody("direct:insert", body);
+        }
+        assertEquals(100L, testCollection.countDocuments());
+
+        // Testing the update logic
+        Bson extraField = Filters.eq("extraField", true);
+        assertEquals(50L, testCollection.countDocuments(extraField),
+                "Number of records with 'extraField' flag on must equal 50");
+
+        Exchange resultExchange = template.request("direct:remove", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody(extraField);
+            }
+        });
+        Object result = resultExchange.getMessage().getBody();
+        assertTrue(result instanceof DeleteResult);
+        assertEquals(50L, resultExchange.getMessage().getHeader(MongoDbConstants.RECORDS_AFFECTED),
+                "Number of records deleted header should equal 50");
+
+        assertEquals(0, testCollection.countDocuments(extraField),
+                "Number of records with 'extraField' flag on must be 0 after remove");
+
+    }
+
+    @Test
+    public void testAggregate() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+        pumpDataIntoTestCollection();
+
+        // Repeat ten times, obtain 10 batches of 100 results each time
+        List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", "Einstein"))),
+                group("$scientist", sum("count", 1)));
+        Object result = template.requestBody("direct:aggregate", aggregate);
+        assertTrue(result instanceof List, "Result is not of type List");
+
+        @SuppressWarnings("unchecked")
+        List<Document> resultList = (List<Document>) result;
+        assertListSize("Result does not contain 2 elements", resultList, 2);
+        // TODO Add more asserts
+    }
+
+    @Test
+    public void testDbStats() {
+        assertEquals(0, testCollection.countDocuments());
+        Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
+        assertTrue(result instanceof Document, "Result is not of type Document");
+        assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+    }
+
+    @Test
+    public void testColStats() {
+        assertEquals(0, testCollection.countDocuments());
+
+        // Add some records to the collection (and do it via camel-mongodb)
+        for (int i = 1; i <= 100; i++) {
+            String body = null;
+            try (Formatter f = new Formatter();) {
+                body = f.format("{\"_id\":\"testSave%d\", \"scientist\":\"Einstein\"}", i).toString();
+                f.close();
+            }
+            template.requestBody("direct:insert", body);
+        }
+
+        Object result = template.requestBody("direct:getColStats", "irrelevantBody");
+        assertTrue(result instanceof Document, "Result is not of type Document");
+        assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+    }
+
+    @Test
+    public void testCommand() {
+        // Call hostInfo, command working with every configuration
+        Object result = template.requestBody("direct:command", "{\"hostInfo\":\"1\"}");
+        assertTrue(result instanceof Document, "Result is not of type Document");
+        assertTrue(Document.class.cast(result).keySet().size() > 0, "The result should contain keys");
+    }
+
+    @Test
+    public void testOperationHeader() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.countDocuments());
+
+        // check that the count operation was invoked instead of the insert
+        // operation
+        Object result
+                = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER, "count");
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(0L, result, "Test collection should not contain any records");
+
+        // check that the count operation was invoked instead of the insert
+        // operation
+        result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", MongoDbConstants.OPERATION_HEADER,
+                MongoDbOperation.count);
+        assertTrue(result instanceof Long, "Result is not of type Long");
+        assertEquals(0L, result, "Test collection should not contain any records");
+
+    }
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:count").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=count&dynamicity=true");
+                    from("direct:insert")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                    from("direct:testStoreOidOnInsert")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert")
+                            .setBody()
+                            .header(MongoDbConstants.OID);
+                    from("direct:save")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save");
+                    from("direct:testStoreOidOnSave")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save")
+                            .setBody()
+                            .header(MongoDbConstants.OID);
+                    from("direct:update")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update");
+                    from("direct:remove")
+                            .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove");
+                    from("direct:aggregate").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+                    from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&operation=getDbStats");
+                    from("direct:getColStats").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
+                    from("direct:command").to("mongodb:myDb?database={{mongodb.testDb}}&operation=command");
+                }
+            };
+        }
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java
new file mode 100644
index 0000000..73d8201
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbOutputTypeIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.test.junit5.TestSupport.assertListSize;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbConstants;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.apache.commons.lang3.ObjectUtils;
+
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.MongoIterable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+				MongoDbOutputTypeIT.class,
+				MongoDbOutputTypeIT.TestConfiguration.class,
+				AbstractMongoDbITSupport.MongoConfiguration.class
+		}
+)
+public class MongoDbOutputTypeIT extends AbstractMongoDbITSupport {
+
+	@Test
+	public void testFindAllDBCursor() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+		// Repeat ten times, obtain 10 batches of 100 results each time
+		int numToSkip = 0;
+		final int limit = 100;
+		for (int i = 0; i < 10; i++) {
+			Map<String, Object> headers = new HashMap<>();
+			headers.put(MongoDbConstants.NUM_TO_SKIP, numToSkip);
+			headers.put(MongoDbConstants.LIMIT, 100);
+			Object result = template.requestBodyAndHeaders("direct:findAllDBCursor", ObjectUtils.NULL, headers);
+			assertTrue(result instanceof MongoIterable, "Result is not of type MongoIterable");
+
+			@SuppressWarnings("unchecked")
+			MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+			// Ensure that all returned documents contain all fields
+			for (Document document : resultCursor) {
+				assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+				assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+				assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+			}
+
+			numToSkip = numToSkip + limit;
+		}
+	}
+
+	@Test
+	public void testFindAllDocumentList() {
+		// Test that the collection has 0 documents in it
+		assertEquals(0, testCollection.countDocuments());
+		pumpDataIntoTestCollection();
+		Object result = template.requestBody("direct:findAllDocumentList", ObjectUtils.NULL);
+		assertTrue(result instanceof List, "Result is not of type List");
+		@SuppressWarnings("unchecked")
+		List<Document> resultList = (List<Document>) result;
+
+		assertListSize("Result does not contain 1000 elements", resultList, 1000);
+
+		// Ensure that all returned documents contain all fields
+		for (Document document : resultList) {
+			assertNotNull(document.get(MONGO_ID), "Document in returned list should contain all fields");
+			assertNotNull(document.get("scientist"), "Document in returned list should contain all fields");
+			assertNotNull(document.get("fixedField"), "Document in returned list should contain all fields");
+		}
+
+		for (Exchange resultExchange : getMockEndpoint("mock:resultFindAll").getReceivedExchanges()) {
+			assertEquals(1000, resultExchange.getIn().getHeader(MongoDbConstants.RESULT_TOTAL_SIZE),
+					"Result total size header should equal 1000");
+		}
+	}
+
+	@Test
+	public void testInitFindWithWrongOutputType() {
+		try {
+			RouteBuilder taillableRouteBuilder = new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true&outputType=MongoIterable")
+							.to("mock:dummy");
+				}
+			};
+			template.getCamelContext().addRoutes(taillableRouteBuilder);
+			fail("Endpoint should not be initialized with a non compatible outputType");
+		} catch (Exception exception) {
+			assertTrue(exception.getCause() instanceof IllegalArgumentException,
+					"Exception is not of type IllegalArgumentException");
+		}
+	}
+
+	@Test
+	public void testInitTailWithWrongOutputType() {
+		try {
+			RouteBuilder taillableRouteBuilder = new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&outputType=MongoIterable")
+							.id("tailableCursorConsumer1").autoStartup(false).to("mock:test");
+				}
+			};
+			template.getCamelContext().addRoutes(taillableRouteBuilder);
+			fail("Endpoint should not be initialized with a non compatible outputType");
+		} catch (Exception exception) {
+			assertTrue(exception.getCause() instanceof IllegalArgumentException,
+					"Exception is not of type IllegalArgumentException");
+		}
+	}
+
+	@Configuration
+	public class TestConfiguration {
+
+		@Bean
+		public RouteBuilder routeBuilder() {
+			return new RouteBuilder() {
+				@Override
+				public void configure() {
+					from("direct:findAllDBCursor")
+							.to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&dynamicity=true&outputType=MongoIterable")
+							.to("mock:resultFindAllDBCursor");
+					from("direct:findAllDocumentList").to(
+									"mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findAll&outputType=DocumentList")
+							.to("mock:resultFindAllDocumentList");
+				}
+			};
+		}
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java
new file mode 100644
index 0000000..9227216
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbReadPreferenceOptionIT.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import com.mongodb.ReadPreference;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbReadPreferenceOptionIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbReadPreferenceOptionIT extends AbstractMongoDbITSupport {
+
+    private MongoDbEndpoint endpoint;
+
+    @Test
+    public void testInvalidReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=foo");
+
+        Exception ex = assertThrows(IllegalArgumentException.class,
+                () -> endpoint.getReadPreferenceBean());
+
+        assertTrue(ex.getMessage().startsWith("No match for read preference"));
+    }
+
+    @Test
+    public void testNoReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}");
+        assertSame(ReadPreference.primary(), endpoint.getReadPreferenceBean());
+        // the default is primary
+    }
+
+    @Test
+    public void testPrimaryReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=primary");
+        assertSame(ReadPreference.primary(), endpoint.getReadPreferenceBean());
+    }
+
+    @Test
+    public void testPrimaryPreferredReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=primaryPreferred");
+        assertSame(ReadPreference.primaryPreferred(), endpoint.getReadPreferenceBean());
+    }
+
+    @Test
+    public void testSecondaryReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=secondary");
+        assertSame(ReadPreference.secondary(), endpoint.getReadPreferenceBean());
+    }
+
+    @Test
+    public void testSecondaryPreferredReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=secondaryPreferred");
+        assertSame(ReadPreference.secondaryPreferred(), endpoint.getReadPreferenceBean());
+    }
+
+    @Test
+    public void testNearestReadPreferenceOptionValue() throws Exception {
+        endpoint = createMongoDbEndpoint("mongodb:myDb?database={{mongodb.testDb}}&readPreference=nearest");
+        assertSame(ReadPreference.nearest(), endpoint.getReadPreferenceBean());
+    }
+
+    private MongoDbEndpoint createMongoDbEndpoint(String uri) throws Exception {
+        Endpoint mongoEndpoint = context.getComponent("mongodb").createEndpoint(uri);
+        mongoEndpoint.start();
+        return MongoDbEndpoint.class.cast(mongoEndpoint);
+    }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java
new file mode 100644
index 0000000..8351d6f
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbSpringDslOperationsIT.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbSpringDslOperationsIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        },
+        properties = { "camel.springboot.routes-include-pattern=file:src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml" }
+)
+public class MongoDbSpringDslOperationsIT extends MongoDbOperationsIT {
+
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java
new file mode 100644
index 0000000..7576bb8
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbStopEndpointIT.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDbEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbStopEndpointIT.class,
+                MongoDbStopEndpointIT.TestConfiguration.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        },
+        properties = { "mongodb.testDb=test", "mongodb.testCollection=camelTest" }
+)
+public class MongoDbStopEndpointIT extends AbstractMongoDbITSupport {
+
+    private static final String MY_ID = "myId";
+
+    String endpoint = "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert";
+
+    @Configuration
+    public class TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:insertJsonString").routeId("insert").to(endpoint);
+                    from("direct:findById").routeId("find").to(
+                            "mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=findById&dynamicity=true");
+                }
+            };
+        }
+    }
+
+    @Test
+    public void testStopEndpoint() {
+        assertEquals(0, testCollection.countDocuments());
+
+        template.requestBody("direct:insertJsonString", "{\"scientist\": \"Newton\", \"_id\": \"" + MY_ID + "\"}");
+
+        context.getEndpoint(endpoint).stop();
+
+        Document result = template.requestBody("direct:findById", MY_ID, Document.class);
+
+        assertEquals(MY_ID, result.get(MONGO_ID));
+        assertEquals("Newton", result.get("scientist"));
+    }
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java
new file mode 100644
index 0000000..4ad4b70
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/integration/MongoDbTailableCursorConsumerIT.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.integration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import static com.mongodb.client.model.Filters.eq;
+
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mongodb.MongoDbTailTrackingConfig;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+
+import java.util.Calendar;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+		classes = {
+				CamelAutoConfiguration.class,
+				MongoDbTailableCursorConsumerIT.class,
+				AbstractMongoDbITSupport.MongoConfiguration.class
+		}
+)
+public class MongoDbTailableCursorConsumerIT extends AbstractMongoDbITSupport {
+
+	private MongoCollection<Document> cappedTestCollection;
+	private String cappedTestCollectionName = "camelTestCapped";
+
+	@Test
+	public void testThousandRecordsWithoutReadPreference() throws Exception {
+		testThousandRecordsWithRouteId("tailableCursorConsumer1");
+	}
+
+	@Test
+	public void testThousandRecordsWithReadPreference() throws Exception {
+		testThousandRecordsWithRouteId("tailableCursorConsumer1.readPreference");
+	}
+
+	@Test
+	public void testNoRecords() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		// create a capped collection with max = 1000
+		CreateCollectionOptions collectionOptions
+				= new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000);
+		db.createCollection(cappedTestCollectionName, collectionOptions);
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		assertEquals(0, cappedTestCollection.countDocuments());
+
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer1");
+		MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(0);
+		Thread.sleep(1000);
+		mock.assertIsSatisfied();
+		context.getRouteController().stopRoute("tailableCursorConsumer1");
+	}
+
+	@Test
+	public void testMultipleBursts() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		// create a capped collection with max = 1000
+		CreateCollectionOptions createCollectionOptions
+				= new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000);
+		db.createCollection(cappedTestCollectionName, createCollectionOptions);
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer1");
+
+		MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(5000);
+
+		// pump 5 bursts of 1000 records each with 500ms pause between burst and
+		// burst
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 0; i < 5000; i++) {
+					if (i % 1000 == 0) {
+						try {
+							Thread.sleep(500);
+						} catch (InterruptedException e) {
+							return;
+						}
+					}
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+				}
+			}
+		});
+
+		// start the data pumping
+		t.start();
+		// before we assert, wait for the data pumping to end
+		t.join();
+
+		mock.assertIsSatisfied();
+		context.getRouteController().stopRoute("tailableCursorConsumer1");
+	}
+
+	@Test
+	public void testHundredThousandRecords() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+
+		// create a capped collection with max = 1000
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get())
+		db.createCollection(cappedTestCollectionName,
+				new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer1");
+
+		final MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(1000);
+
+		// continuous pump of 100000 records, asserting incrementally to reduce
+		// overhead on the mock endpoint
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 1; i <= 100000; i++) {
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+
+					// incrementally assert, as the mock endpoint stores all
+					// messages and otherwise the test would be sluggish
+					if (i % 1000 == 0) {
+						try {
+							MongoDbTailableCursorConsumerIT.this.assertAndResetMockEndpoint(mock);
+						} catch (Exception e) {
+							return;
+						}
+					}
+				}
+			}
+		});
+
+		// start the data pumping
+		t.start();
+		// before we stop the route, wait for the data pumping to end
+		t.join();
+
+		context.getRouteController().stopRoute("tailableCursorConsumer1");
+	}
+
+	@Test
+	public void testPersistentTailTrack() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+
+		// drop the tracking collection
+		db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).drop();
+		// create a capped collection with max = 1000
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		db.createCollection(cappedTestCollectionName,
+				new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		cappedTestCollection.createIndex(new Document("increasing", 1));
+
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer2");
+
+		final MockEndpoint mock = getMockEndpoint("mock:test");
+
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 1; i <= 300; i++) {
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+				}
+			}
+		});
+
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+		mock.reset();
+		context.getRouteController().stopRoute("tailableCursorConsumer2");
+		while (context.getRouteController().getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) {
+		}
+		context.getRouteController().startRoute("tailableCursorConsumer2");
+
+		// expect 300 messages and not 600
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 301; i <= 600; i++) {
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+				}
+			}
+		});
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+
+		// check that the first message received in this second batch
+		// corresponds to increasing=301
+		Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+		assertTrue(firstBody instanceof Document);
+		assertEquals(301, Document.class.cast(firstBody).get("increasing"));
+
+		// check that the lastVal is persisted at the right time: check before
+		// and after stopping the route
+		assertEquals(300, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).find(eq("persistentId", "darwin"))
+				.first().get("lastTrackingValue"));
+		// stop the route and verify the last value has been updated
+		context.getRouteController().stopRoute("tailableCursorConsumer2");
+		while (context.getRouteController().getRouteStatus("tailableCursorConsumer2") != ServiceStatus.Stopped) {
+		}
+		assertEquals(600, db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION).find(eq("persistentId", "darwin"))
+				.first().get("lastTrackingValue"));
+	}
+
+	@Test
+	public void testPersistentTailTrackIncreasingDateField() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+		final Calendar startTimestamp = Calendar.getInstance();
+
+		// get default tracking collection
+		MongoCollection<Document> trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION, Document.class);
+		trackingCol.drop();
+		trackingCol = db.getCollection(MongoDbTailTrackingConfig.DEFAULT_COLLECTION, Document.class);
+
+		// create a capped collection with max = 1000
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		db.createCollection(cappedTestCollectionName,
+				new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer2");
+
+		final MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 1; i <= 300; i++) {
+					Calendar c = (Calendar) (startTimestamp.clone());
+					c.add(Calendar.MINUTE, i);
+					cappedTestCollection.insertOne(new Document("increasing", c.getTime()).append("string", "value" + i));
+				}
+			}
+		});
+
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+		mock.reset();
+		// ensure that the persisted lastVal is startTimestamp + 300min
+		Calendar cal300 = (Calendar) startTimestamp.clone();
+		cal300.add(Calendar.MINUTE, 300);
+		context.getRouteController().stopRoute("tailableCursorConsumer2");
+		assertEquals(cal300.getTime(),
+				trackingCol.find(eq("persistentId", "darwin")).first().get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
+		context.getRouteController().startRoute("tailableCursorConsumer2");
+
+		// expect 300 messages and not 600
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 301; i <= 600; i++) {
+					Calendar c = (Calendar) (startTimestamp.clone());
+					c.add(Calendar.MINUTE, i);
+					cappedTestCollection.insertOne(new Document("increasing", c.getTime()).append("string", "value" + i));
+				}
+			}
+		});
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+		Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+		assertTrue(firstBody instanceof Document);
+		Calendar cal301 = Calendar.class.cast(startTimestamp.clone());
+		cal301.add(Calendar.MINUTE, 301);
+		assertEquals(cal301.getTime(), Document.class.cast(firstBody).get("increasing"));
+		// check that the persisted lastVal after stopping the route is
+		// startTimestamp + 600min
+		context.getRouteController().stopRoute("tailableCursorConsumer2");
+		Calendar cal600 = (Calendar) startTimestamp.clone();
+		cal600.add(Calendar.MINUTE, 600);
+		assertEquals(cal600.getTime(),
+				trackingCol.find(eq("persistentId", "darwin")).first().get(MongoDbTailTrackingConfig.DEFAULT_FIELD));
+	}
+
+	@Test
+	public void testCustomTailTrackLocation() throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+
+		// get the custom tracking collection and drop it
+		// (tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton)
+		MongoCollection<Document> trackingCol = mongo.getDatabase("einstein").getCollection("curie", Document.class);
+		trackingCol.drop();
+		trackingCol = mongo.getDatabase("einstein").getCollection("curie", Document.class);
+
+		// create a capped collection with max = 1000
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		db.createCollection(cappedTestCollectionName,
+				new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		addTestRoutes();
+		context.getRouteController().startRoute("tailableCursorConsumer3");
+
+		final MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 1; i <= 300; i++) {
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+				}
+			}
+		});
+
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+		mock.reset();
+
+		// stop the route to ensure that our lastVal is persisted, and check it
+		context.getRouteController().stopRoute("tailableCursorConsumer3");
+		// ensure that the persisted lastVal is 300, newton is the name of the
+		// trackingField we are using
+		assertEquals(300, trackingCol.find(eq("persistentId", "darwin")).first().get("newton"));
+		context.getRouteController().startRoute("tailableCursorConsumer3");
+
+		// expect 300 messages and not 600
+		mock.expectedMessageCount(300);
+		// pump 300 records
+		t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				for (int i = 301; i <= 600; i++) {
+					cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+				}
+			}
+		});
+		// start the data pumping
+		t.start();
+		// before we continue wait for the data pump to end
+		t.join();
+		mock.assertIsSatisfied();
+		// check that the first received body contains increasing=301 and not
+		// increasing=1, i.e. it's not starting from the top
+		Object firstBody = mock.getExchanges().get(0).getIn().getBody();
+		assertTrue(firstBody instanceof Document);
+		assertEquals(301, (Document.class.cast(firstBody)).get("increasing"));
+		// check that the persisted lastVal after stopping the route is 600,
+		// newton is the name of the trackingField we are using
+		context.getRouteController().stopRoute("tailableCursorConsumer3");
+		assertEquals(600, trackingCol.find(eq("persistentId", "darwin")).first().get("newton"));
+	}
+
+	public void assertAndResetMockEndpoint(MockEndpoint mock) throws Exception {
+		mock.assertIsSatisfied();
+		mock.reset();
+	}
+
+	private void testThousandRecordsWithRouteId(String routeId) throws Exception {
+		assertEquals(0, cappedTestCollection.countDocuments());
+
+		// create a capped collection with max = 1000
+		// DocumentBuilder.start().add("capped", true).add("size",
+		// 1000000000).add("max", 1000).get()
+		db.createCollection(cappedTestCollectionName,
+				new CreateCollectionOptions().capped(true).sizeInBytes(1000000000).maxDocuments(1000));
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		for (int i = 0; i < 1000; i++) {
+			cappedTestCollection.insertOne(new Document("increasing", i).append("string", "value" + i));
+		}
+		assertEquals(1000, cappedTestCollection.countDocuments());
+
+		addTestRoutes();
+		context.getRouteController().startRoute(routeId);
+		MockEndpoint mock = getMockEndpoint("mock:test");
+		mock.expectedMessageCount(1000);
+		Thread.sleep(1000);
+		mock.assertIsSatisfied();
+		context.getRouteController().stopRoute(routeId);
+	}
+
+	@BeforeEach
+	public void before() {
+		// drop the capped collection and let each test create what it needs
+		cappedTestCollection = db.getCollection(cappedTestCollectionName, Document.class);
+		cappedTestCollection.drop();
+	}
+
+	protected void addTestRoutes() throws Exception {
+		context.addRoutes(new RouteBuilder() {
+
+			@Override
+			public void configure() {
+
+				from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing")
+						.id("tailableCursorConsumer1")
+						.autoStartup(false).to("mock:test");
+
+				from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&persistentTailTracking=true&persistentId=darwin")
+						.id("tailableCursorConsumer2").autoStartup(false).to("mock:test");
+
+				from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing&"
+						+ "persistentTailTracking=true&persistentId=darwin&tailTrackDb=einstein&tailTrackCollection=curie&tailTrackField=newton")
+						.id("tailableCursorConsumer3")
+						.autoStartup(false).to("mock:test");
+
+				from("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.cappedTestCollection}}&tailTrackIncreasingField=increasing")// &readPreference=primary")
+						.id("tailableCursorConsumer1.readPreference").autoStartup(false).to("mock:test");
+			}
+		});
+	}
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java
new file mode 100644
index 0000000..d21e466
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/meta/integration/MongoDbMetaExtensionIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.meta.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.ValidationOptions;
+import org.apache.camel.component.extension.MetaDataExtension;
+import org.apache.camel.component.mongodb.MongoDbComponent;
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.integration.MongoDbDynamicityIT;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbMetaExtensionIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbMetaExtensionIT extends AbstractMongoDbITSupport {
+    // We simulate the presence of an authenticated user
+    @BeforeEach
+    public void createAuthorizationUser() {
+        super.createAuthorizationUser();
+    }
+
+    protected MongoDbComponent getComponent() {
+        return context.getComponent(SCHEME, MongoDbComponent.class);
+    }
+
+    @Test
+    public void testValidMetaData() {
+        // When
+        final String database = "test";
+        final String collection = "validatedCollection";
+        MongoDbComponent component = this.getComponent();
+        // Given
+        Document jsonSchema = Document.parse("{ \n"
+                                             + "      bsonType: \"object\", \n"
+                                             + "      required: [ \"name\", \"surname\", \"email\" ], \n"
+                                             + "      properties: { \n"
+                                             + "         name: { \n"
+                                             + "            bsonType: \"string\", \n"
+                                             + "            description: \"required and must be a string\" }, \n"
+                                             + "         surname: { \n"
+                                             + "            bsonType: \"string\", \n"
+                                             + "            description: \"required and must be a string\" }, \n"
+                                             + "         email: { \n"
+                                             + "            bsonType: \"string\", \n"
+                                             + "            pattern: \"^.+@.+$\", \n"
+                                             + "            description: \"required and must be a valid email address\" }, \n"
+                                             + "         year_of_birth: { \n"
+                                             + "            bsonType: \"int\", \n"
+                                             + "            minimum: 1900, \n"
+                                             + "            maximum: 2018,\n"
+                                             + "            description: \"the value must be in the range 1900-2018\" }, \n"
+                                             + "         gender: { \n"
+                                             + "            enum: [ \"M\", \"F\" ], \n"
+                                             + "            description: \"can be only M or F\" } \n"
+                                             + "      }}");
+        ValidationOptions collOptions = new ValidationOptions().validator(Filters.jsonSchema(jsonSchema));
+        AbstractMongoDbITSupport.mongo.getDatabase(database).createCollection(collection,
+                new CreateCollectionOptions().validationOptions(collOptions));
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("database", database);
+        parameters.put("collection", collection);
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+
+        MetaDataExtension.MetaData result = component.getExtension(MetaDataExtension.class).get().meta(parameters)
+                .orElseThrow(UnsupportedOperationException::new);
+        // Then
+        assertEquals("application/schema+json", result.getAttribute(MetaDataExtension.MetaData.CONTENT_TYPE));
+        assertEquals(JsonNode.class, result.getAttribute(MetaDataExtension.MetaData.JAVA_TYPE));
+        assertNotNull(result.getPayload(JsonNode.class));
+        assertNotNull(result.getPayload(JsonNode.class).get("properties"));
+        assertNotNull(result.getPayload(JsonNode.class).get("$schema"));
+        assertEquals("http://json-schema.org/schema#", result.getPayload(JsonNode.class).get("$schema").asText());
+        assertNotNull(result.getPayload(JsonNode.class).get("id"));
+        assertNotNull(result.getPayload(JsonNode.class).get("type"));
+    }
+
+    @Test
+    public void testMissingCollection() {
+        // When
+        final String database = "test";
+        final String collection = "missingCollection";
+        MongoDbComponent component = this.getComponent();
+        // Given
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("database", database);
+        parameters.put("collection", collection);
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+
+        final Optional<MetaDataExtension.MetaData> meta
+                = component.getExtension(MetaDataExtension.class).get().meta(parameters);
+
+        // Then
+        assertThrows(IllegalArgumentException.class, () -> meta.orElseThrow(IllegalArgumentException::new));
+    }
+
+    @Test
+    public void testMissingParameters() {
+        // When
+        MongoDbComponent component = this.getComponent();
+        // Given
+        Map<String, Object> parameters = new HashMap<>();
+
+        final MetaDataExtension metaDataExtension = component.getExtension(MetaDataExtension.class).get();
+
+        // Then
+        assertThrows(IllegalArgumentException.class, () -> metaDataExtension.meta(parameters));
+    }
+
+    @Test
+    public void testNotValidatedCollection() {
+        // When
+        final String database = "test";
+        final String collection = "notValidatedCollection";
+        MongoDbComponent component = this.getComponent();
+        AbstractMongoDbITSupport.mongo.getDatabase(database).createCollection(collection);
+        // Given
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("database", database);
+        parameters.put("collection", collection);
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+
+        final Optional<MetaDataExtension.MetaData> meta
+                = component.getExtension(MetaDataExtension.class).get().meta(parameters);
+
+        // Then
+        assertThrows(UnsupportedOperationException.class, () -> meta.orElseThrow(UnsupportedOperationException::new));
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java
new file mode 100644
index 0000000..24bb00a
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/processor/idempotent/MongoDbIdempotentRepositoryIT.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.processor.idempotent;
+
+import java.util.UUID;
+
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.meta.integration.MongoDbMetaExtensionIT;
+import org.apache.camel.component.mongodb.processor.idempotent.MongoDbIdempotentRepository;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.bson.Document;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbIdempotentRepositoryIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbIdempotentRepositoryIT extends AbstractMongoDbITSupport {
+
+    MongoDbIdempotentRepository repo;
+
+    @BeforeEach
+    @AfterEach
+    public void clearDB() {
+        testCollection.deleteMany(new Document());
+        repo = new MongoDbIdempotentRepository(mongo, testCollectionName, dbName);
+    }
+
+    @Test
+    public void add() {
+        String randomUUIDString = UUID.randomUUID().toString();
+
+        boolean added = repo.add(randomUUIDString);
+        assertEquals(1, testCollection.countDocuments(), "Driver inserted document");
+        assertTrue(added, "Add ui returned true");
+    }
+
+    @Test
+    public void addAndContains() {
+        String randomUUIDString = UUID.randomUUID().toString();
+
+        repo.add(randomUUIDString);
+        assertEquals(1, testCollection.countDocuments());
+
+        boolean found = repo.contains(randomUUIDString);
+        assertTrue(found, "Added uid was found");
+    }
+
+    @Test
+    public void addAndRemove() {
+        String randomUUIDString = UUID.randomUUID().toString();
+
+        repo.add(randomUUIDString);
+        assertEquals(1, testCollection.countDocuments());
+
+        boolean removed = repo.remove(randomUUIDString);
+        assertTrue(removed, "Added uid was removed correctly");
+        assertEquals(0, testCollection.countDocuments());
+    }
+
+    @Test
+    public void addDuplicatedFails() {
+        String randomUUIDString = UUID.randomUUID().toString();
+
+        repo.add(randomUUIDString);
+        assertEquals(1, testCollection.countDocuments());
+
+        boolean added = repo.add(randomUUIDString);
+        assertTrue(!added, "Duplicated entry was not added");
+        assertEquals(1, testCollection.countDocuments());
+    }
+
+    @Test
+    public void deleteMissingiIsFailse() {
+        String randomUUIDString = UUID.randomUUID().toString();
+        assertEquals(0, testCollection.countDocuments());
+        boolean removed = repo.remove(randomUUIDString);
+        assertTrue(!removed, "Non exisint uid returns false");
+    }
+
+    @Test
+    public void containsMissingReturnsFalse() {
+        String randomUUIDString = UUID.randomUUID().toString();
+        boolean found = repo.contains(randomUUIDString);
+        assertTrue(!found, "Non existing item is not found");
+    }
+
+    @Test
+    public void confirmAllwaysReturnsTrue() {
+        String randomUUIDString = UUID.randomUUID().toString();
+        boolean found = repo.confirm(randomUUIDString);
+        assertTrue(found, "Confirm always returns true");
+
+        found = repo.confirm(null);
+        assertTrue(found, "Confirm always returns true, even with null");
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java
new file mode 100644
index 0000000..2e543dc
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/java/org/apache/camel/component/mongodb/verifier/integration/MongoDbVerifierExtensionIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb.verifier.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.component.mongodb.integration.AbstractMongoDbITSupport;
+import org.apache.camel.component.mongodb.processor.idempotent.MongoDbIdempotentRepositoryIT;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                MongoDbVerifierExtensionIT.class,
+                AbstractMongoDbITSupport.MongoConfiguration.class
+        }
+)
+public class MongoDbVerifierExtensionIT extends AbstractMongoDbITSupport {
+    // We simulate the presence of an authenticated user
+    @BeforeEach
+    public void createAuthorizationUser() {
+        super.createAuthorizationUser();
+    }
+
+    protected ComponentVerifierExtension getExtension() {
+        Component component = context.getComponent(SCHEME);
+        ComponentVerifierExtension verifier
+                = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        return verifier;
+    }
+
+    @Test
+    public void verifyConnectionOK() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+    }
+
+    @Test
+    public void verifyConnectionKO() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", "notReachable.host");
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+        assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to connect"));
+    }
+
+    @Test
+    public void verifyConnectionMissingParams() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+        assertTrue(result.getErrors().get(0).getDescription().startsWith("password should be set"));
+    }
+
+    @Test
+    public void verifyConnectionNotAuthenticated() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", "wrongPassword");
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+        assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to authenticate"));
+    }
+
+    @Test
+    public void verifyConnectionAdminDBKO() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", service.getConnectionAddress());
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+        parameters.put("adminDB", "someAdminDB");
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+        assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to authenticate"));
+    }
+
+    @Test
+    public void verifyConnectionPortKO() {
+        //When
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("host", "localhost:12343");
+        parameters.put("user", USER);
+        parameters.put("password", PASSWORD);
+        //Given
+        ComponentVerifierExtension.Result result
+                = getExtension().verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+        //Then
+        assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+        assertTrue(result.getErrors().get(0).getDescription().startsWith("Unable to connect"));
+    }
+
+}
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties b/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..0689548
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/log4j2.properties
@@ -0,0 +1,33 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-mongodb-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+logger.mongodb.name = org.apache.camel.component.mongodb.MongoDbContainer
+logger.mongodb.level = INFO
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
+#rootLogger.appenderRef.out.ref = out
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties b/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties
new file mode 100644
index 0000000..a80317c
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/mongodb.test.properties
@@ -0,0 +1,23 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+mongodb.testDb=test
+mongodb.testCollection=camelTest
+mongodb.cappedTestCollection=camelTestCapped
+
+myStreamFilter = { '$match':{'$or':[{'fullDocument.string': 'value2'}]} }
diff --git a/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml b/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml
new file mode 100644
index 0000000..9f08fa0
--- /dev/null
+++ b/components-starter/camel-mongodb-starter/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<routes id="camel" xmlns="http://camel.apache.org/schema/spring">
+    <route>
+        <from uri="direct:count" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=count&amp;dynamicity=true" />
+    </route>
+
+    <route>
+        <from uri="direct:insert" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=insert" /> <!-- &amp;writeConcern=SAFE" /> -->
+    </route>
+
+    <route>
+        <from uri="direct:testStoreOidOnInsert" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=insert" /> <!-- &amp;writeConcern=SAFE" /> -->
+        <setBody>
+            <header>CamelMongoOid</header>
+        </setBody>
+    </route>
+
+    <route>
+        <from uri="direct:save" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=save" /> <!-- &amp;writeConcern=SAFE" /> -->
+    </route>
+
+    <route>
+        <from uri="direct:testStoreOidOnSave" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=save" /> <!-- &amp;writeConcern=SAFE" /> -->
+        <setBody>
+            <header>CamelMongoOid</header>
+        </setBody>
+    </route>
+
+    <route>
+        <from uri="direct:update" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=update" /> <!-- &amp;writeConcern=SAFE" /> -->
+    </route>
+
+    <route>
+        <from uri="direct:remove" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=remove" /> <!-- &amp;writeConcern=SAFE" /> -->
+    </route>
+
+    <route>
+        <from uri="direct:aggregate" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=aggregate" /> <!-- &amp;writeConcern=SAFE" /> -->
+    </route>
+
+    <route>
+        <from uri="direct:getDbStats" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;operation=getDbStats" />
+    </route>
+
+    <route>
+        <from uri="direct:getColStats" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;collection={{mongodb.testCollection}}&amp;operation=getColStats" />
+    </route>
+
+    <route>
+        <from uri="direct:command" />
+        <to uri="mongodb:myDb?database={{mongodb.testDb}}&amp;operation=command" />
+    </route>
+</routes>