You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/12/18 09:07:20 UTC

[04/13] camel git commit: CAMEL-10554 - camel-mongodb evolution to driver 3. Merging the component. Thanks to Jean-Yves Terrien for the contribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
new file mode 100644
index 0000000..694e054
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+public enum MongoDbOutputType {
+    DocumentList, //List<Document>
+    Document, //Document
+    MongoIterable //MongoIterable
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
new file mode 100644
index 0000000..d1f04c2
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.BATCH_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION_INDEX;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.DATABASE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.CRITERIA;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.FIELDS_PROJECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.LIMIT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MULTIUPDATE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.NUM_TO_SKIP;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OPERATION_HEADER;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_AFFECTED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_MATCHED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_PAGE_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_TOTAL_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.SORT_BY;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.UPSERT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.WRITERESULT;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+//import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+
+/**
+ * The MongoDb producer.
+ */
+public class MongoDbProducer extends DefaultProducer {
+	private static final Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
+	private final Map<MongoDbOperation, Processor> operations = new HashMap<>();
+	private MongoDbEndpoint endpoint;
+
+	{
+		bind(MongoDbOperation.aggregate, createDoAggregate());
+		bind(MongoDbOperation.command, createDoCommand());
+		bind(MongoDbOperation.count, createDoCount());
+		bind(MongoDbOperation.findAll, createDoFindAll());
+		bind(MongoDbOperation.findById, createDoFindById());
+		bind(MongoDbOperation.findOneByQuery, createDoFindOneByQuery());
+		bind(MongoDbOperation.getColStats, createDoGetColStats());
+		bind(MongoDbOperation.getDbStats, createDoGetDbStats());
+		bind(MongoDbOperation.insert, createDoInsert());
+		bind(MongoDbOperation.remove, createDoRemove());
+		bind(MongoDbOperation.save, createDoSave());
+		bind(MongoDbOperation.update, createDoUpdate());
+	}
+
+	public MongoDbProducer(MongoDbEndpoint endpoint) {
+		super(endpoint);
+		this.endpoint = endpoint;
+	}
+
+	public void process(Exchange exchange) throws Exception {
+		MongoDbOperation operation = endpoint.getOperation();
+		Object header = exchange.getIn().getHeader(OPERATION_HEADER);
+		if (header != null) {
+			LOG.debug("Overriding default operation with operation specified on header: {}", header);
+			try {
+				if (header instanceof MongoDbOperation) {
+					operation = ObjectHelper.cast(MongoDbOperation.class, header);
+				} else {
+					// evaluate as a String
+					operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(OPERATION_HEADER, String.class));
+				}
+			} catch (Exception e) {
+				throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
+			}
+		}
+
+		try {
+			invokeOperation(operation, exchange);
+		} catch (Exception e) {
+			throw MongoDbComponent.wrapInCamelMongoDbException(e);
+		}
+
+	}
+
+	/**
+	 * Entry method that selects the appropriate MongoDB operation and executes it
+	 *
+	 * @param operation
+	 * @param exchange
+	 */
+	protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
+		Processor processor = operations.get(operation);
+		if (processor != null) {
+			processor.process(exchange);
+		} else {
+			throw new CamelMongoDbException("Operation not supported. Value: " + operation);
+		}
+	}
+
+	private MongoDbProducer bind(MongoDbOperation operation, Function<Exchange, Object> mongoDbFunction) {
+		operations.put(operation, wrap(mongoDbFunction, operation));
+		return this;
+	}
+
+	// ----------- MongoDB operations ----------------
+
+	private Document createDbStatsCommand() {
+		return new Document("dbStats", 1).append("scale", 1);
+	}
+
+	private Document createCollStatsCommand(String collectionName) {
+		return new Document("collStats", collectionName);
+	}
+
+
+	// --------- Convenience methods -----------------------
+	private MongoDatabase calculateDb(Exchange exchange) {
+		// dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+		// resolution logic on every Exchange if they won't be using this functionality at all
+		if (!endpoint.isDynamicity()) {
+			return endpoint.getMongoDatabase();
+		}
+
+		String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+		MongoDatabase db;
+
+		if (dynamicDB == null) {
+			db = endpoint.getMongoDatabase();
+		} else {
+			db = endpoint.getMongoConnection().getDatabase(dynamicDB);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Dynamic database selected: {}", db.getName());
+		}
+		return db;
+	}
+
+	private String calculateCollectionName(Exchange exchange) {
+		if (!endpoint.isDynamicity()) {
+			return endpoint.getCollection();
+		}
+		String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+		if (dynamicCollection == null) {
+			return endpoint.getCollection();
+		}
+		return dynamicCollection;
+	}
+
+	private MongoCollection<Document> calculateCollection(Exchange exchange) {
+		// dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+		// resolution logic on every Exchange if they won't be using this functionality at all
+		if (!endpoint.isDynamicity()) {
+			return endpoint.getMongoCollection()
+					.withWriteConcern(endpoint.getWriteConcern());
+		}
+
+		String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+		String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+
+		@SuppressWarnings("unchecked")
+		List<Bson> dynamicIndex = exchange.getIn().getHeader(COLLECTION_INDEX, List.class);
+
+		MongoCollection<Document> dbCol;
+
+		if (dynamicDB == null && dynamicCollection == null) {
+			dbCol = endpoint.getMongoCollection()
+					.withWriteConcern(endpoint.getWriteConcern());
+		} else {
+			MongoDatabase db = calculateDb(exchange);
+
+			if (dynamicCollection == null) {
+				dbCol = db.getCollection(endpoint.getCollection(), Document.class);
+			} else {
+				dbCol = db.getCollection(dynamicCollection, Document.class);
+
+				// on the fly add index
+				if (dynamicIndex == null) {
+					endpoint.ensureIndex(dbCol, endpoint.createIndex());
+				} else {
+					endpoint.ensureIndex(dbCol, dynamicIndex);
+				}
+			}
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Dynamic database and/or collection selected: {}->{}", endpoint.getDatabase(), endpoint.getCollection());
+		}
+		return dbCol;
+	}
+
+	@SuppressWarnings("rawtypes")
+	private List<Document> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
+		List<Document> documentList = new ArrayList<>(insertList.size());
+		TypeConverter converter = exchange.getContext().getTypeConverter();
+		for (Object item : insertList) {
+			try {
+				Document document = converter.mandatoryConvertTo(Document.class, item);
+				documentList.add(document);
+			} catch (Exception e) {
+				throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-Document items", e);
+			}
+		}
+		return documentList;
+	}
+
+	private boolean isWriteOperation(MongoDbOperation operation) {
+		return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
+	}
+
+	private Processor wrap(Function<Exchange, Object> supplier, MongoDbOperation operation) {
+		return exchange -> {
+			Object result = supplier.apply(exchange);
+			copyHeaders(exchange);
+			moveBodyToOutIfResultIsReturnedAsHeader(exchange, operation);
+			processAndTransferResult(result, exchange, operation);
+		};
+	}
+
+	private void copyHeaders(Exchange exchange) {
+		MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
+	}
+
+	private void moveBodyToOutIfResultIsReturnedAsHeader(Exchange exchange, MongoDbOperation operation) {
+		if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+			exchange.getOut().setBody(exchange.getIn().getBody());
+		}
+	}
+
+	private void processAndTransferResult(Object result, Exchange exchange, MongoDbOperation operation) {
+		// determine where to set the WriteResult: as the OUT body or as an IN message header
+		if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+			exchange.getOut().setHeader(WRITERESULT, result);
+		} else {
+			exchange.getOut().setBody(result);
+		}
+	}
+
+	private Function<Exchange, Object> createDoGetColStats() {
+		return exch ->
+		calculateDb(exch).runCommand(createCollStatsCommand(calculateCollectionName(exch)));
+	}
+
+	private Function<Exchange, Object> createDoFindOneByQuery() {
+		return exch -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exch);
+
+				Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+				if(null == query) {
+					query = exch.getIn().getMandatoryBody(Bson.class);
+				}
+
+				Bson sortBy = exch.getIn().getHeader(SORT_BY, Bson.class);
+				Bson fieldFilter = exch.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+				if (fieldFilter == null) {
+					fieldFilter = new Document();
+				}
+
+				if (sortBy == null) {
+					sortBy = new Document();
+				}
+
+				Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first();
+				exch.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+				return ret;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Payload is no Document", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoCount() {
+		return exch -> {
+			Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+			if (null==query) {
+				query = exch.getIn().getBody(Bson.class);
+			}
+			if (query == null) {
+				query = new Document();
+			}
+			return calculateCollection(exch).count(query);
+		};
+	}
+
+	private Function<Exchange, Object> createDoFindAll() {
+		return exchange1 -> {
+			Iterable<Document> result;
+			MongoCollection<Document> dbCol = calculateCollection(exchange1);
+			// do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+			Bson query = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+			// do not run around looking for a type converter unless there is a need for it
+			if (null == query && exchange1.getIn().getBody() != null) {
+				query = exchange1.getIn().getBody(Bson.class);
+			}
+			Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+			// get the batch size and number to skip
+			Integer batchSize = exchange1.getIn().getHeader(BATCH_SIZE, Integer.class);
+			Integer numToSkip = exchange1.getIn().getHeader(NUM_TO_SKIP, Integer.class);
+			Integer limit = exchange1.getIn().getHeader(LIMIT, Integer.class);
+			Document sortBy = exchange1.getIn().getHeader(SORT_BY, Document.class);
+			FindIterable<Document> ret;
+			if (query == null && fieldFilter == null) {
+				ret = dbCol.find(new Document());
+			} else if (fieldFilter == null) {
+				ret = dbCol.find(query);
+			} else if (query != null) {
+				ret = dbCol.find(query).projection(fieldFilter);
+			} else {
+				ret = dbCol.find(new Document()).projection(fieldFilter);
+			}
+
+			if (sortBy != null) {
+				ret.sort(sortBy);
+			}
+
+			if (batchSize != null) {
+				ret.batchSize(batchSize);
+			}
+
+			if (numToSkip != null) {
+				ret.skip(numToSkip);
+			}
+
+			if (limit != null) {
+				ret.limit(limit);
+			}
+
+			if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
+				try {
+					result = new ArrayList<>();
+					ret.iterator().forEachRemaining(((List<Document>) result)::add);
+					exchange1.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>) result).size());
+				} finally {
+					ret.iterator().close();
+				}
+			} else {
+				result = ret;
+			}
+			return result;
+		};
+	}
+
+	private Function<Exchange, Object> createDoInsert() {
+		return exchange1 -> {
+			MongoCollection<Document> dbCol = calculateCollection(exchange1);
+			boolean singleInsert = true;
+			Object insert = exchange1.getIn().getBody(Document.class);
+			// body could not be converted to Document, check to see if it's of type List<Document>
+			if (insert == null) {
+				insert = exchange1.getIn().getBody(List.class);
+				// if the body of type List was obtained, ensure that all items are of type Document and cast the List to List<Document>
+				if (insert != null) {
+					singleInsert = false;
+					insert = attemptConvertToList((List<?>) insert, exchange1);
+				} else {
+					throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>");
+				}
+			}
+
+			if (singleInsert) {
+				Document insertObject = Document.class.cast(insert);
+				dbCol.insertOne(insertObject);
+
+				exchange1.getIn().setHeader(OID, insertObject.get(MONGO_ID));
+			} else {
+				@SuppressWarnings("unchecked")
+				List<Document> insertObjects = (List<Document>) insert;
+				dbCol.insertMany(insertObjects);
+				List<Object> objectIdentification = new ArrayList<>(insertObjects.size());
+				objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList()));
+				exchange1.getIn().setHeader(OID, objectIdentification);
+			}
+			return insert;
+		};
+	}
+
+	private Function<Exchange, Object> createDoUpdate() {
+		return exchange1 -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+				Bson updateCriteria = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+				Bson objNew;
+				if (null == updateCriteria){
+					@SuppressWarnings("unchecked")
+					List<Bson> saveObj = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
+					if (saveObj.size() != 2) {
+						throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2");
+					}
+
+					updateCriteria = saveObj.get(0);
+					objNew = saveObj.get(1);
+				} else {
+					objNew = exchange1.getIn().getMandatoryBody(Bson.class);
+				}
+
+				Boolean multi = exchange1.getIn().getHeader(MULTIUPDATE, Boolean.class);
+				Boolean upsert = exchange1.getIn().getHeader(UPSERT, Boolean.class);
+
+				UpdateResult result;
+				UpdateOptions options = new UpdateOptions();
+				if (upsert != null) {
+					options.upsert(upsert);
+				}
+
+				if (multi == null || !multi) {
+					result = dbCol.updateOne(updateCriteria, objNew, options);
+				} else {
+					result = dbCol.updateMany(updateCriteria, objNew, options);
+				}
+				if (result.isModifiedCountAvailable()) {
+					exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount());
+				}
+				exchange1.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount());
+				return result;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Invalid payload for update", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoRemove() {
+		return exchange1 -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exchange1);
+				Document removeObj = exchange1.getIn().getMandatoryBody(Document.class);
+
+				DeleteResult result = dbCol.deleteMany(removeObj);
+				if (result.wasAcknowledged()) {
+					exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount());
+				}
+				return result;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Invalid payload for remove", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoAggregate() {
+		return exchange1 -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+				// Impossible with java driver to get the batch size and number to skip
+				List<Document> dbIterator = new ArrayList<>();
+				AggregateIterable<Document> aggregationResult;
+
+				@SuppressWarnings("unchecked")
+				List<Bson> query = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
+
+				// Allow body to be a pipeline
+				// @see http://docs.mongodb.org/manual/core/aggregation/
+				if (null != query) {
+					List<Bson> queryList = query.stream().map(o -> (Bson) o).collect(Collectors.toList());
+					aggregationResult = dbCol.aggregate(queryList);
+				} else {
+					List<Bson> queryList = new ArrayList<>();
+					queryList.add(Bson.class.cast(exchange1.getIn().getMandatoryBody(Bson.class)));
+					aggregationResult = dbCol.aggregate(queryList);
+				}
+				aggregationResult.iterator().forEachRemaining(dbIterator::add);
+				return dbIterator;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Invalid payload for aggregate", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoCommand() {
+		return exchange1 -> {
+			try {
+				MongoDatabase db = calculateDb(exchange1);
+				Document cmdObj = exchange1.getIn().getMandatoryBody(Document.class);
+				return db.runCommand(cmdObj);
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Invalid payload for command", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoGetDbStats() {
+		return exchange1 -> calculateDb(exchange1).runCommand(createDbStatsCommand());
+	}
+
+	private Function<Exchange, Object> createDoFindById() {
+		return exchange1 -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exchange1);
+				Object id = exchange1.getIn().getMandatoryBody();
+				Bson o = Filters.eq(MONGO_ID, id);
+				Document ret;
+
+				Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+				if (fieldFilter == null) {
+					fieldFilter = new Document();
+				}
+				ret = dbCol.find(o).projection(fieldFilter).first();
+				exchange1.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+				return ret;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Invalid payload for findById", e);
+			}
+		};
+	}
+
+	private Function<Exchange, Object> createDoSave() {
+		return exchange1 -> {
+			try {
+				MongoCollection<Document> dbCol = calculateCollection(exchange1);
+				Document saveObj = exchange1.getIn().getMandatoryBody(Document.class);
+				UpdateOptions options = new UpdateOptions().upsert(true);
+				UpdateResult result = null;
+				if (null == saveObj.get(MONGO_ID)) {
+					result = dbCol.replaceOne(Filters.where("false"), saveObj, options);
+					exchange1.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue());
+				} else {
+					result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options);
+					exchange1.getIn().setHeader(OID, saveObj.get(MONGO_ID));
+				}
+				return result;
+			} catch (InvalidPayloadException e) {
+				throw new CamelMongoDbException("Body incorrect type for save", e);
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
new file mode 100644
index 0000000..4eba81f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+public class MongoDbTailTrackingConfig {
+    
+    public static final String DEFAULT_COLLECTION = "camelTailTracking";
+    public static final String DEFAULT_FIELD = "lastTrackingValue";
+    
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+     */
+    public final String increasingField;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+     */
+    public final boolean persistent;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackDb(String)}
+     */
+    public final String db;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackCollection(String)}
+     */
+    public final String collection;
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackField(String)}
+     */
+    public final String field;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentId(String)}
+     */
+    public final String persistentId;
+    
+    public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
+            String tailTrackCollection, String tailTrackField, String persistentId) {
+        this.increasingField = tailTrackIncreasingField;
+        this.persistent = persistentTailTracking;
+        this.db = tailTrackDb;
+        this.persistentId = persistentId;
+        this.collection = tailTrackCollection == null ? DEFAULT_COLLECTION : tailTrackCollection;
+        this.field = tailTrackField == null ? DEFAULT_FIELD : tailTrackField;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
new file mode 100644
index 0000000..e34643d
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Updates;
+
+public class MongoDbTailTrackingManager {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
+    
+    public Object lastVal;
+
+    private final MongoClient connection;
+    private final MongoDbTailTrackingConfig config;
+    private MongoCollection<Document> dbCol;
+    private Document trackingObj;
+    
+    public MongoDbTailTrackingManager(MongoClient connection, MongoDbTailTrackingConfig config) {
+        this.connection = connection;
+        this.config = config;
+    }
+    
+    public void initialize() throws Exception {
+        if (!config.persistent) {
+            return;
+        }
+        
+        dbCol = connection.getDatabase(config.db).getCollection(config.collection, Document.class);
+        Document filter = new Document("persistentId", config.persistentId);
+        trackingObj = dbCol.find(filter).first();
+        if (trackingObj == null) {
+            dbCol.insertOne(filter);
+            trackingObj = dbCol.find(filter).first();
+        }
+        // keep only the _id, the rest is useless and causes more overhead during update
+        trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
+    }
+    
+    public synchronized void persistToStore() {
+        if (!config.persistent || lastVal == null) {
+            return;
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
+        }
+        
+        Bson updateObj = Updates.set(config.field, lastVal);
+        dbCol.updateOne(trackingObj, updateObj);
+        trackingObj = dbCol.find().first();
+    }
+    
+    public synchronized Object recoverFromStore() {
+        if (!config.persistent) {
+            return null;
+        }
+        
+        lastVal = dbCol.find(trackingObj).first().get(config.field);
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
+        }
+        
+        return lastVal;
+    }
+    
+    public void setLastVal(Document dbObj) {
+        if (config.increasingField == null) {
+            return;
+        }
+        
+        lastVal = dbObj.get(config.increasingField);
+    }
+    
+    public String getIncreasingFieldName() {
+        return config.increasingField;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
new file mode 100644
index 0000000..69c8b7f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * The MongoDb consumer.
+ */
+public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+    private final MongoDbEndpoint endpoint;
+    private ExecutorService executor;
+    private MongoDbTailingProcess tailingProcess;
+
+    public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (tailingProcess != null) {
+            tailingProcess.stop();
+        }
+        if (executor != null) {
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
+            executor = null;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
+        MongoDbTailTrackingManager trackingManager = initTailTracking();
+        tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
+        tailingProcess.initializeProcess();
+        executor.execute(tailingProcess);
+    }
+    
+    protected MongoDbTailTrackingManager initTailTracking() throws Exception {
+        MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
+        answer.initialize();
+        return answer;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
new file mode 100644
index 0000000..f51dd48
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.gt;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.CursorType;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+public class MongoDbTailingProcess implements Runnable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
+	private static final String CAPPED_KEY = "capped";
+
+	public volatile boolean keepRunning = true;
+	public volatile boolean stopped; // = false
+	private volatile CountDownLatch stoppedLatch;
+
+	private final MongoCollection<Document> dbCol;
+	private final MongoDbEndpoint endpoint;
+	private final MongoDbTailableCursorConsumer consumer;
+
+	// create local, final copies of these variables for increased performance
+	private final long cursorRegenerationDelay;
+	private final boolean cursorRegenerationDelayEnabled;
+
+	private MongoCursor<Document> cursor;
+	private MongoDbTailTrackingManager tailTracking;
+
+	public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer,
+			MongoDbTailTrackingManager tailTrack) {
+		this.endpoint = endpoint;
+		this.consumer = consumer;
+		this.dbCol = endpoint.getMongoCollection();
+		this.tailTracking = tailTrack;
+		this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+		this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+	}
+
+	public MongoCursor<Document> getCursor() {
+		return cursor;
+	}
+
+	/**
+	 * Initialise the tailing process, the cursor and if persistent tail
+	 * tracking is enabled, recover the cursor from the persisted point. As part
+	 * of the initialisation process, the component will validate that the
+	 * collection we are targeting is 'capped'.
+	 *
+	 * @throws Exception
+	 */
+	public void initializeProcess() throws Exception {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}",
+					"db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
+		}
+
+		if (!isCollectionCapped()) {
+			throw new CamelMongoDbException(
+					"Tailable cursors are only compatible with capped collections, and collection "
+							+ endpoint.getCollection() + " is not capped");
+		}
+		try {
+			// recover the last value from the store if it exists
+			tailTracking.recoverFromStore();
+			cursor = initializeCursor();
+		} catch (Exception e) {
+			throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
+		}
+
+		if (cursor == null) {
+			throw new CamelMongoDbException(
+					"Tailable cursor was not initialized, or cursor returned is dead on arrival");
+		}
+
+	}
+
+	private Boolean isCollectionCapped() {
+		return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+	}
+
+	private Document createCollStatsCommand() {
+		return new Document("collStats", endpoint.getCollection());
+	}
+
+	/**
+	 * The heart of the tailing process.
+	 */
+	@Override
+	public void run() {
+		stoppedLatch = new CountDownLatch(1);
+		while (keepRunning) {
+			doRun();
+			// if the previous call didn't return because we have stopped
+			// running, then regenerate the cursor
+			if (keepRunning) {
+				cursor.close();
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal,
+							cursorRegenerationDelay);
+				}
+
+				if (cursorRegenerationDelayEnabled) {
+					try {
+						Thread.sleep(cursorRegenerationDelay);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+
+				cursor = initializeCursor();
+			}
+		}
+
+		stopped = true;
+		stoppedLatch.countDown();
+	}
+
+	protected void stop() throws Exception {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}",
+					"db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+		}
+		keepRunning = false;
+		// close the cursor if it's open, so if it is blocked on hasNext() it
+		// will return immediately
+		if (cursor != null) {
+			cursor.close();
+		}
+		awaitStopped();
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}",
+					"db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+		}
+	}
+
+	/**
+	 * The heart of the tailing process.
+	 */
+	private void doRun() {
+		// while the cursor has more values, keepRunning is true and the
+		// cursorId is not 0, which symbolizes that the cursor is dead
+		try {
+			while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() !=
+														// 0 &&
+				Document dbObj = cursor.next();
+				Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+				try {
+					if (LOG.isTraceEnabled()) {
+						LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
+					}
+					consumer.getProcessor().process(exchange);
+				} catch (Exception e) {
+					// do nothing
+				}
+				tailTracking.setLastVal(dbObj);
+			}
+		} catch (MongoCursorNotFoundException e) {
+			// we only log the warning if we are not stopping, otherwise it is
+			// expected because the stop() method kills the cursor just in case
+			// it is blocked
+			// waiting for more data to arrive
+			if (keepRunning) {
+				LOG.debug(
+						"Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.",
+						e);
+			}
+		}
+
+		// the loop finished, persist the lastValue just in case we are shutting
+		// down
+		// TODO: perhaps add a functionality to persist every N records
+		tailTracking.persistToStore();
+	}
+
+	// no arguments, will ask DB what the last updated Id was (checking
+	// persistent storage)
+	private MongoCursor<Document> initializeCursor() {
+		Object lastVal = tailTracking.lastVal;
+		// lastVal can be null if we are initializing and there is no
+		// persistence enabled
+		MongoCursor<Document> answer;
+		if (lastVal == null) {
+			answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
+		} else {
+			try (MongoCursor<Document> iterator = 
+					dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) {
+				answer = iterator;
+			}
+		}
+		return answer;
+	}
+
+	private void awaitStopped() throws InterruptedException {
+		if (!stopped) {
+			LOG.info("Going to wait for stopping");
+			stoppedLatch.await();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
new file mode 100644
index 0000000..920d11a
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3.converters;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.util.IOHelper;
+import org.bson.BsonArray;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.BsonValueCodecProvider;
+import org.bson.codecs.DecoderContext;
+import org.bson.codecs.DocumentCodec;
+import org.bson.codecs.DocumentCodecProvider;
+import org.bson.codecs.ValueCodecProvider;
+import org.bson.codecs.configuration.CodecRegistries;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.bson.json.JsonReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Converter
+public final class MongoDbBasicConverters {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MongoDbBasicConverters.class);
+
+	// Jackson's ObjectMapper is thread-safe, so no need to create a pool nor synchronize access to it
+	private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+	private MongoDbBasicConverters() {
+	}
+
+	@Converter
+	public static Document fromMapToDocument(Map<String, Object> map) {
+		return new Document(map);
+	}
+
+	@Converter
+	public static Map<String, Object> fromDocumentToMap(Document document) {
+		return document;
+	}
+
+	@Converter
+	public static Document fromStringToDocument(String s) {
+		Document answer = null;
+		try {
+			answer = Document.parse(s);
+		} catch (Exception e) {
+			LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+		}
+
+		return answer;
+	}
+
+	@Converter
+	public static Document fromFileToDocument(File f, Exchange exchange) throws FileNotFoundException {
+		return fromInputStreamToDocument(new FileInputStream(f), exchange);
+	}
+
+	@Converter
+	public static Document fromInputStreamToDocument(InputStream is, Exchange exchange) {
+		Document answer = null;
+		try {
+			byte[] input = IOConverter.toBytes(is);
+
+			if (isBson(input)) {
+				JsonReader reader = new JsonReader(new String(input));
+				DocumentCodec documentReader = new DocumentCodec();
+
+				answer = documentReader.decode(reader, DecoderContext.builder().build());
+			} else {
+				answer = Document.parse(IOConverter.toString(input, exchange));
+			}
+		} catch (Exception e) {
+			LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+		} finally {
+			// we need to make sure to close the input stream
+			IOHelper.close(is, "InputStream", LOG);
+		}
+		return answer;
+	}
+
+	/** 
+	 * If the input starts with any number of whitespace characters and then a '{' character, we
+	 * assume it is JSON rather than BSON. There are probably no useful BSON blobs that fit this pattern
+	 */
+	private static boolean isBson(byte[] input) {
+		int i = 0;
+		while (i < input.length) {
+			if (input[i] == '{') {
+				return false;
+			} else if (!Character.isWhitespace(input[i])) {
+				return true;
+			}
+		}
+		return true;
+	}
+
+	@Converter
+	public static Document fromAnyObjectToDocument(Object value) {
+		Document answer;
+		try {
+			@SuppressWarnings("unchecked")
+			Map<String, Object> m = OBJECT_MAPPER.convertValue(value, Map.class);
+			answer = new Document(m);
+		} catch (Exception e) {
+			LOG.warn("Conversion has fallen back to generic Object -> Document, but unable to convert type {}. Returning null. {}",
+					value.getClass().getCanonicalName(), e.getClass().getCanonicalName() + ": " + e.getMessage());
+			return null;
+		}
+		return answer;
+	}
+
+	@Converter
+	public static List<Bson> fromStringToList(String value) {
+
+		final CodecRegistry codecRegistry = CodecRegistries.fromProviders(Arrays.asList(
+				new ValueCodecProvider(),
+				new BsonValueCodecProvider(),
+				new DocumentCodecProvider()));
+
+		JsonReader reader = new JsonReader(value);
+		BsonArrayCodec arrayReader = new BsonArrayCodec(codecRegistry);
+
+		BsonArray docArray = arrayReader.decode(reader, DecoderContext.builder().build());
+		
+		List<Bson> answer = new ArrayList<>(docArray.size());
+
+		for (BsonValue doc : docArray) {
+			answer.add(doc.asDocument());
+		}
+		return answer;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
new file mode 100644
index 0000000..40042b5
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3.processor.idempotent;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.result.DeleteResult;
+
+@ManagedResource(description = "Mongo db based message id repository")
+public class MongoDbIdempotentRepository<E> extends ServiceSupport implements IdempotentRepository<E> {
+	private MongoClient mongoClient;
+    private String collectionName;
+    private String dbName;
+    private MongoCollection<Document> collection;
+
+    public MongoDbIdempotentRepository() {
+    }
+
+    public MongoDbIdempotentRepository(MongoClient mongoClient, String collectionName, String dbName) {
+        this.mongoClient = mongoClient;
+        this.collectionName = collectionName;
+        this.dbName = dbName;
+        this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
+    }
+
+    @ManagedOperation(description = "Adds the key to the store")
+    @Override
+    public boolean add(E key) {
+        Document document = new Document(MONGO_ID, key);
+        try {
+            collection.insertOne(document);
+        } catch (com.mongodb.MongoWriteException ex) {
+            if (ex.getError().getCategory() == ErrorCategory.DUPLICATE_KEY) {
+                return false;
+            }
+            throw ex;
+        }
+        return true;
+    }
+
+    @ManagedOperation(description = "Does the store contain the given key")
+    @Override
+    public boolean contains(E key) {
+        Bson document = eq(MONGO_ID, key);
+        long count =  collection.count(document);
+        return count > 0;
+    }
+
+    @ManagedOperation(description = "Remove the key from the store")
+    @Override
+    public boolean remove(E key) {
+        Bson document = eq(MONGO_ID, key);
+        DeleteResult res = collection.deleteOne(document);
+        return  res.getDeletedCount() > 0;
+    }
+
+    @Override
+    public boolean confirm(E key) {
+        return true;
+    }
+
+    @ManagedOperation(description = "Clear the store")
+    @Override
+    public void clear() {
+        collection.deleteMany(new Document());
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(mongoClient, "cli");
+        ObjectHelper.notNull(dbName, "dbName");
+        ObjectHelper.notNull(collectionName, "collectionName");
+
+        if (collection == null) {
+            this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        return;
+    }
+
+    public MongoClient getMongoClient() {
+        return mongoClient;
+    }
+
+    public void setMongoClient(MongoClient mongoClient) {
+        this.mongoClient = mongoClient;
+    }
+
+    public String getCollectionName() {
+        return collectionName;
+    }
+
+    public void setCollectionName(String collectionName) {
+        this.collectionName = collectionName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public void setDbName(String dbName) {
+        this.dbName = dbName;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt b/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt b/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..c9afc68
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.mongodb3.converters.MongoDbBasicConverters
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3 b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
new file mode 100644
index 0000000..ba9679f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.mongodb3.MongoDbComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
new file mode 100644
index 0000000..52bdd6d
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.Formatter;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.component.mongodb3.CamelMongoDbException;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.Document;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+
+
+public abstract class AbstractMongoDbTest extends CamelTestSupport {
+
+    protected static MongoClient mongo;
+    protected static MongoDatabase db;
+    protected static MongoCollection<Document> testCollection;
+    protected static MongoCollection<Document> dynamicCollection;
+    
+    protected static String dbName = "test";
+    protected static String testCollectionName;
+    protected static String dynamicCollectionName;
+
+    protected ApplicationContext applicationContext;
+
+    @Override
+    public void doPostSetup() {
+        mongo = applicationContext.getBean("myDb", MongoClient.class);
+        db = mongo.getDatabase(dbName);
+
+        // Refresh the test collection - drop it and recreate it. We don't do this for the database because MongoDB would create large
+        // store files each time
+        testCollectionName = "camelTest";
+        testCollection = db.getCollection(testCollectionName, Document.class);
+        testCollection.drop();
+        testCollection = db.getCollection(testCollectionName, Document.class);
+        
+        dynamicCollectionName = testCollectionName.concat("Dynamic");
+        dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+        dynamicCollection.drop();
+        dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        testCollection.drop();
+        dynamicCollection.drop();
+
+        super.tearDown();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        applicationContext = new AnnotationConfigApplicationContext(EmbedMongoConfiguration.class);
+        @SuppressWarnings("deprecation")
+		CamelContext ctx = SpringCamelContext.springCamelContext(applicationContext);
+        PropertiesComponent pc = new PropertiesComponent("classpath:mongodb.test.properties");
+        ctx.addComponent("properties", pc);
+        return ctx;
+    }
+
+    protected void pumpDataIntoTestCollection() {
+        // there should be 100 of each
+        String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+        for (int i = 1; i <= 1000; i++) {
+            int index = i % scientists.length;
+            Formatter f = new Formatter();
+            String doc = f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index]).toString();
+            IOHelper.close(f);
+            testCollection.insertOne(Document.parse(doc));
+        }
+        assertEquals("Data pumping of 1000 entries did not complete entirely", 1000L, testCollection.count());
+    }
+
+    protected CamelMongoDbException extractAndAssertCamelMongoDbException(Object result, String message) {
+        assertTrue("Result is not an Exception", result instanceof Throwable);
+        assertTrue("Result is not an CamelExecutionException", result instanceof CamelExecutionException);
+        Throwable exc = ((CamelExecutionException) result).getCause();
+        assertTrue("Result is not an CamelMongoDbException", exc instanceof CamelMongoDbException);
+        CamelMongoDbException camelExc = ObjectHelper.cast(CamelMongoDbException.class, exc);
+        if (message != null) {
+            assertTrue("CamelMongoDbException doesn't contain desired message string", camelExc.getMessage().contains(message));
+        }
+        return camelExc;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
new file mode 100644
index 0000000..03b34c3
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.MongoClientOptions.builder;
+import static de.flapdoodle.embed.mongo.distribution.Version.Main.PRODUCTION;
+import static de.flapdoodle.embed.process.runtime.Network.localhostIsIPv6;
+import static org.springframework.util.SocketUtils.findAvailableTcpPort;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.ReadPreference;
+import com.mongodb.WriteConcern;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+
+@Configuration
+public class EmbedMongoConfiguration {
+
+    private static final int PORT = findAvailableTcpPort();
+
+    static {
+        try {
+            IMongodConfig mongodConfig = new MongodConfigBuilder()
+                    .version(PRODUCTION)
+                    .net(new Net(PORT, localhostIsIPv6()))
+                    .build();
+            MongodExecutable mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+            mongodExecutable.start();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Bean
+    public MongoClient myDb() throws UnknownHostException {
+        return new MongoClient("0.0.0.0", PORT);
+    }
+
+    @Bean
+    public MongoClient myDbP() throws UnknownHostException {
+		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primary()));
+		return new MongoClient(uri);
+    }
+
+    @Bean
+    public MongoClient myDbPP() throws UnknownHostException {
+		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primaryPreferred()));
+		return new MongoClient(uri);
+    }
+
+    @Bean
+    public MongoClient myDbS() throws UnknownHostException {
+		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondary()));
+		return new MongoClient(uri);
+    }
+
+    @Bean
+    public MongoClient myDbWCA() throws UnknownHostException {
+		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().writeConcern(WriteConcern.ACKNOWLEDGED));
+		return new MongoClient(uri);
+    }
+
+    @Bean
+    public MongoClient myDbSP() throws UnknownHostException {
+        MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondaryPreferred()));
+		return new MongoClient(uri);
+    }
+
+    @Bean
+    public MongoClient myDbN() throws UnknownHostException {
+		MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.nearest()));
+		return new MongoClient(uri);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
new file mode 100644
index 0000000..33b7eeb
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.ImportResource;
+
+@Configuration
+@Import(EmbedMongoConfiguration.class)
+@ImportResource("org/apache/camel/component/mongodb3/mongoBasicOperationsTest.xml")
+public class MongoBasicOperationsConfiguration {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
new file mode 100644
index 0000000..abb265c
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.IOConverter;
+import org.bson.Document;
+import org.junit.Test;
+
+public class MongoDbConversionsTest extends AbstractMongoDbTest {
+
+	@Test
+    public void testInsertMap() throws InterruptedException {
+        assertEquals(0, testCollection.count());
+        
+        Map<String, Object> m1 = new HashMap<>();
+        Map<String, String> m1Nested = new HashMap<>();
+
+        m1Nested.put("nested1", "nestedValue1");
+        m1Nested.put("nested2", "nestedValue2");
+        
+        m1.put("field1", "value1");
+        m1.put("field2", "value2");
+        m1.put("nestedField", m1Nested);
+        m1.put(MONGO_ID, "testInsertMap");
+
+//        Object result = 
+        		template.requestBody("direct:insertMap", m1);
+        		Document b = testCollection.find(eq(MONGO_ID, "testInsertMap")).first();
+        assertNotNull("No record with 'testInsertMap' _id", b);
+
+    }
+    
+    @Test
+    public void testInsertPojo() {
+        assertEquals(0, testCollection.count());
+//        Object result = 
+        		template.requestBody("direct:insertPojo", new MyPojoTest());
+        		Document b = testCollection.find(eq(MONGO_ID, "testInsertPojo")).first();
+        assertNotNull("No record with 'testInsertPojo' _id", b);
+    }
+    
+    @Test
+    public void testInsertJsonString() {
+        assertEquals(0, testCollection.count());
+//        Object result = 
+        		template.requestBody("direct:insertJsonString", "{\"fruits\": [\"apple\", \"banana\", \"papaya\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}");
+        //assertTrue(result instanceof WriteResult);
+        Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+        assertNotNull("No record with 'testInsertJsonString' _id", b);
+    }
+    
+    @Test
+    public void testInsertJsonInputStream() throws Exception {
+        assertEquals(0, testCollection.count());
+//        Object result = 
+        		template.requestBody("direct:insertJsonString", 
+                        IOConverter.toInputStream("{\"fruits\": [\"apple\", \"banana\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}\n", null));
+        		Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+        assertNotNull("No record with 'testInsertJsonString' _id", b);
+    }
+    
+    @Test
+    public void testInsertBsonInputStream() {
+        assertEquals(0, testCollection.count());
+        
+        Document document = new Document(MONGO_ID, "testInsertBsonString");
+        
+//        Object result = 
+        		template.requestBody("direct:insertJsonString", new ByteArrayInputStream(document.toJson().getBytes()));
+        		Document b = testCollection.find(eq(MONGO_ID, "testInsertBsonString")).first();
+        assertNotNull("No record with 'testInsertBsonString' _id", b);
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                
+                from("direct:insertMap").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                from("direct:insertPojo").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                from("direct:insertJsonString").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+                from("direct:insertJsonStringWriteResultInString").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert").convertBodyTo(String.class);
+
+            }
+        };
+    }
+    
+    @SuppressWarnings("unused")
+    private class MyPojoTest {
+        public int number = 123;
+        public String text = "hello";
+        public String[] array = {"daVinci", "copernico", "einstein"};
+        // CHECKSTYLE:OFF
+        public String _id = "testInsertPojo";
+        // CHECKSTYLE:ON
+    }
+    
+}