You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/12/18 09:07:20 UTC
[04/13] camel git commit: CAMEL-10554 - camel-mongodb evolution to
driver 3. Merging the component. Thanks to Jean-Yves Terrien for the
contribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
new file mode 100644
index 0000000..694e054
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbOutputType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+public enum MongoDbOutputType {
+ DocumentList, //List<Document>
+ Document, //Document
+ MongoIterable //MongoIterable
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
new file mode 100644
index 0000000..d1f04c2
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.BATCH_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.COLLECTION_INDEX;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.DATABASE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.CRITERIA;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.FIELDS_PROJECTION;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.LIMIT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MULTIUPDATE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.NUM_TO_SKIP;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OID;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.OPERATION_HEADER;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_AFFECTED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RECORDS_MATCHED;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_PAGE_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.RESULT_TOTAL_SIZE;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.SORT_BY;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.UPSERT;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.WRITERESULT;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
+//import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+
+/**
+ * The MongoDb producer.
+ */
+public class MongoDbProducer extends DefaultProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
+ private final Map<MongoDbOperation, Processor> operations = new HashMap<>();
+ private MongoDbEndpoint endpoint;
+
+ {
+ bind(MongoDbOperation.aggregate, createDoAggregate());
+ bind(MongoDbOperation.command, createDoCommand());
+ bind(MongoDbOperation.count, createDoCount());
+ bind(MongoDbOperation.findAll, createDoFindAll());
+ bind(MongoDbOperation.findById, createDoFindById());
+ bind(MongoDbOperation.findOneByQuery, createDoFindOneByQuery());
+ bind(MongoDbOperation.getColStats, createDoGetColStats());
+ bind(MongoDbOperation.getDbStats, createDoGetDbStats());
+ bind(MongoDbOperation.insert, createDoInsert());
+ bind(MongoDbOperation.remove, createDoRemove());
+ bind(MongoDbOperation.save, createDoSave());
+ bind(MongoDbOperation.update, createDoUpdate());
+ }
+
+ public MongoDbProducer(MongoDbEndpoint endpoint) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ MongoDbOperation operation = endpoint.getOperation();
+ Object header = exchange.getIn().getHeader(OPERATION_HEADER);
+ if (header != null) {
+ LOG.debug("Overriding default operation with operation specified on header: {}", header);
+ try {
+ if (header instanceof MongoDbOperation) {
+ operation = ObjectHelper.cast(MongoDbOperation.class, header);
+ } else {
+ // evaluate as a String
+ operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(OPERATION_HEADER, String.class));
+ }
+ } catch (Exception e) {
+ throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
+ }
+ }
+
+ try {
+ invokeOperation(operation, exchange);
+ } catch (Exception e) {
+ throw MongoDbComponent.wrapInCamelMongoDbException(e);
+ }
+
+ }
+
+ /**
+ * Entry method that selects the appropriate MongoDB operation and executes it
+ *
+ * @param operation
+ * @param exchange
+ */
+ protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
+ Processor processor = operations.get(operation);
+ if (processor != null) {
+ processor.process(exchange);
+ } else {
+ throw new CamelMongoDbException("Operation not supported. Value: " + operation);
+ }
+ }
+
+ private MongoDbProducer bind(MongoDbOperation operation, Function<Exchange, Object> mongoDbFunction) {
+ operations.put(operation, wrap(mongoDbFunction, operation));
+ return this;
+ }
+
+ // ----------- MongoDB operations ----------------
+
+ private Document createDbStatsCommand() {
+ return new Document("dbStats", 1).append("scale", 1);
+ }
+
+ private Document createCollStatsCommand(String collectionName) {
+ return new Document("collStats", collectionName);
+ }
+
+
+ // --------- Convenience methods -----------------------
+ private MongoDatabase calculateDb(Exchange exchange) {
+ // dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+ // resolution logic on every Exchange if they won't be using this functionality at all
+ if (!endpoint.isDynamicity()) {
+ return endpoint.getMongoDatabase();
+ }
+
+ String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+ MongoDatabase db;
+
+ if (dynamicDB == null) {
+ db = endpoint.getMongoDatabase();
+ } else {
+ db = endpoint.getMongoConnection().getDatabase(dynamicDB);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dynamic database selected: {}", db.getName());
+ }
+ return db;
+ }
+
+ private String calculateCollectionName(Exchange exchange) {
+ if (!endpoint.isDynamicity()) {
+ return endpoint.getCollection();
+ }
+ String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+ if (dynamicCollection == null) {
+ return endpoint.getCollection();
+ }
+ return dynamicCollection;
+ }
+
+ private MongoCollection<Document> calculateCollection(Exchange exchange) {
+ // dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+ // resolution logic on every Exchange if they won't be using this functionality at all
+ if (!endpoint.isDynamicity()) {
+ return endpoint.getMongoCollection()
+ .withWriteConcern(endpoint.getWriteConcern());
+ }
+
+ String dynamicDB = exchange.getIn().getHeader(DATABASE, String.class);
+ String dynamicCollection = exchange.getIn().getHeader(COLLECTION, String.class);
+
+ @SuppressWarnings("unchecked")
+ List<Bson> dynamicIndex = exchange.getIn().getHeader(COLLECTION_INDEX, List.class);
+
+ MongoCollection<Document> dbCol;
+
+ if (dynamicDB == null && dynamicCollection == null) {
+ dbCol = endpoint.getMongoCollection()
+ .withWriteConcern(endpoint.getWriteConcern());
+ } else {
+ MongoDatabase db = calculateDb(exchange);
+
+ if (dynamicCollection == null) {
+ dbCol = db.getCollection(endpoint.getCollection(), Document.class);
+ } else {
+ dbCol = db.getCollection(dynamicCollection, Document.class);
+
+ // on the fly add index
+ if (dynamicIndex == null) {
+ endpoint.ensureIndex(dbCol, endpoint.createIndex());
+ } else {
+ endpoint.ensureIndex(dbCol, dynamicIndex);
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dynamic database and/or collection selected: {}->{}", endpoint.getDatabase(), endpoint.getCollection());
+ }
+ return dbCol;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private List<Document> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
+ List<Document> documentList = new ArrayList<>(insertList.size());
+ TypeConverter converter = exchange.getContext().getTypeConverter();
+ for (Object item : insertList) {
+ try {
+ Document document = converter.mandatoryConvertTo(Document.class, item);
+ documentList.add(document);
+ } catch (Exception e) {
+ throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-Document items", e);
+ }
+ }
+ return documentList;
+ }
+
+ private boolean isWriteOperation(MongoDbOperation operation) {
+ return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
+ }
+
+ private Processor wrap(Function<Exchange, Object> supplier, MongoDbOperation operation) {
+ return exchange -> {
+ Object result = supplier.apply(exchange);
+ copyHeaders(exchange);
+ moveBodyToOutIfResultIsReturnedAsHeader(exchange, operation);
+ processAndTransferResult(result, exchange, operation);
+ };
+ }
+
+ private void copyHeaders(Exchange exchange) {
+ MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false);
+ }
+
+ private void moveBodyToOutIfResultIsReturnedAsHeader(Exchange exchange, MongoDbOperation operation) {
+ if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ }
+ }
+
+ private void processAndTransferResult(Object result, Exchange exchange, MongoDbOperation operation) {
+ // determine where to set the WriteResult: as the OUT body or as an IN message header
+ if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+ exchange.getOut().setHeader(WRITERESULT, result);
+ } else {
+ exchange.getOut().setBody(result);
+ }
+ }
+
+ private Function<Exchange, Object> createDoGetColStats() {
+ return exch ->
+ calculateDb(exch).runCommand(createCollStatsCommand(calculateCollectionName(exch)));
+ }
+
+ private Function<Exchange, Object> createDoFindOneByQuery() {
+ return exch -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exch);
+
+ Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+ if(null == query) {
+ query = exch.getIn().getMandatoryBody(Bson.class);
+ }
+
+ Bson sortBy = exch.getIn().getHeader(SORT_BY, Bson.class);
+ Bson fieldFilter = exch.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+ if (fieldFilter == null) {
+ fieldFilter = new Document();
+ }
+
+ if (sortBy == null) {
+ sortBy = new Document();
+ }
+
+ Document ret = dbCol.find(query).projection(fieldFilter).sort(sortBy).first();
+ exch.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+ return ret;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Payload is no Document", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoCount() {
+ return exch -> {
+ Bson query = exch.getIn().getHeader(CRITERIA, Bson.class);
+ if (null==query) {
+ query = exch.getIn().getBody(Bson.class);
+ }
+ if (query == null) {
+ query = new Document();
+ }
+ return calculateCollection(exch).count(query);
+ };
+ }
+
+ private Function<Exchange, Object> createDoFindAll() {
+ return exchange1 -> {
+ Iterable<Document> result;
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+ // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+ Bson query = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+ // do not run around looking for a type converter unless there is a need for it
+ if (null == query && exchange1.getIn().getBody() != null) {
+ query = exchange1.getIn().getBody(Bson.class);
+ }
+ Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+
+ // get the batch size and number to skip
+ Integer batchSize = exchange1.getIn().getHeader(BATCH_SIZE, Integer.class);
+ Integer numToSkip = exchange1.getIn().getHeader(NUM_TO_SKIP, Integer.class);
+ Integer limit = exchange1.getIn().getHeader(LIMIT, Integer.class);
+ Document sortBy = exchange1.getIn().getHeader(SORT_BY, Document.class);
+ FindIterable<Document> ret;
+ if (query == null && fieldFilter == null) {
+ ret = dbCol.find(new Document());
+ } else if (fieldFilter == null) {
+ ret = dbCol.find(query);
+ } else if (query != null) {
+ ret = dbCol.find(query).projection(fieldFilter);
+ } else {
+ ret = dbCol.find(new Document()).projection(fieldFilter);
+ }
+
+ if (sortBy != null) {
+ ret.sort(sortBy);
+ }
+
+ if (batchSize != null) {
+ ret.batchSize(batchSize);
+ }
+
+ if (numToSkip != null) {
+ ret.skip(numToSkip);
+ }
+
+ if (limit != null) {
+ ret.limit(limit);
+ }
+
+ if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
+ try {
+ result = new ArrayList<>();
+ ret.iterator().forEachRemaining(((List<Document>) result)::add);
+ exchange1.getOut().setHeader(RESULT_PAGE_SIZE, ((List<Document>) result).size());
+ } finally {
+ ret.iterator().close();
+ }
+ } else {
+ result = ret;
+ }
+ return result;
+ };
+ }
+
+ private Function<Exchange, Object> createDoInsert() {
+ return exchange1 -> {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+ boolean singleInsert = true;
+ Object insert = exchange1.getIn().getBody(Document.class);
+ // body could not be converted to Document, check to see if it's of type List<Document>
+ if (insert == null) {
+ insert = exchange1.getIn().getBody(List.class);
+ // if the body of type List was obtained, ensure that all items are of type Document and cast the List to List<Document>
+ if (insert != null) {
+ singleInsert = false;
+ insert = attemptConvertToList((List<?>) insert, exchange1);
+ } else {
+ throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type Document nor List<Document>");
+ }
+ }
+
+ if (singleInsert) {
+ Document insertObject = Document.class.cast(insert);
+ dbCol.insertOne(insertObject);
+
+ exchange1.getIn().setHeader(OID, insertObject.get(MONGO_ID));
+ } else {
+ @SuppressWarnings("unchecked")
+ List<Document> insertObjects = (List<Document>) insert;
+ dbCol.insertMany(insertObjects);
+ List<Object> objectIdentification = new ArrayList<>(insertObjects.size());
+ objectIdentification.addAll(insertObjects.stream().map(insertObject -> insertObject.get(MONGO_ID)).collect(Collectors.toList()));
+ exchange1.getIn().setHeader(OID, objectIdentification);
+ }
+ return insert;
+ };
+ }
+
+ private Function<Exchange, Object> createDoUpdate() {
+ return exchange1 -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+ Bson updateCriteria = exchange1.getIn().getHeader(CRITERIA, Bson.class);
+ Bson objNew;
+ if (null == updateCriteria){
+ @SuppressWarnings("unchecked")
+ List<Bson> saveObj = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
+ if (saveObj.size() != 2) {
+ throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of Document objects with size = 2");
+ }
+
+ updateCriteria = saveObj.get(0);
+ objNew = saveObj.get(1);
+ } else {
+ objNew = exchange1.getIn().getMandatoryBody(Bson.class);
+ }
+
+ Boolean multi = exchange1.getIn().getHeader(MULTIUPDATE, Boolean.class);
+ Boolean upsert = exchange1.getIn().getHeader(UPSERT, Boolean.class);
+
+ UpdateResult result;
+ UpdateOptions options = new UpdateOptions();
+ if (upsert != null) {
+ options.upsert(upsert);
+ }
+
+ if (multi == null || !multi) {
+ result = dbCol.updateOne(updateCriteria, objNew, options);
+ } else {
+ result = dbCol.updateMany(updateCriteria, objNew, options);
+ }
+ if (result.isModifiedCountAvailable()) {
+ exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getModifiedCount());
+ }
+ exchange1.getOut().setHeader(RECORDS_MATCHED, result.getMatchedCount());
+ return result;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Invalid payload for update", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoRemove() {
+ return exchange1 -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+ Document removeObj = exchange1.getIn().getMandatoryBody(Document.class);
+
+ DeleteResult result = dbCol.deleteMany(removeObj);
+ if (result.wasAcknowledged()) {
+ exchange1.getOut().setHeader(RECORDS_AFFECTED, result.getDeletedCount());
+ }
+ return result;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Invalid payload for remove", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoAggregate() {
+ return exchange1 -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+
+ // Impossible with java driver to get the batch size and number to skip
+ List<Document> dbIterator = new ArrayList<>();
+ AggregateIterable<Document> aggregationResult;
+
+ @SuppressWarnings("unchecked")
+ List<Bson> query = exchange1.getIn().getMandatoryBody((Class<List<Bson>>) Class.class.cast(List.class));
+
+ // Allow body to be a pipeline
+ // @see http://docs.mongodb.org/manual/core/aggregation/
+ if (null != query) {
+ List<Bson> queryList = query.stream().map(o -> (Bson) o).collect(Collectors.toList());
+ aggregationResult = dbCol.aggregate(queryList);
+ } else {
+ List<Bson> queryList = new ArrayList<>();
+ queryList.add(Bson.class.cast(exchange1.getIn().getMandatoryBody(Bson.class)));
+ aggregationResult = dbCol.aggregate(queryList);
+ }
+ aggregationResult.iterator().forEachRemaining(dbIterator::add);
+ return dbIterator;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Invalid payload for aggregate", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoCommand() {
+ return exchange1 -> {
+ try {
+ MongoDatabase db = calculateDb(exchange1);
+ Document cmdObj = exchange1.getIn().getMandatoryBody(Document.class);
+ return db.runCommand(cmdObj);
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Invalid payload for command", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoGetDbStats() {
+ return exchange1 -> calculateDb(exchange1).runCommand(createDbStatsCommand());
+ }
+
+ private Function<Exchange, Object> createDoFindById() {
+ return exchange1 -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+ Object id = exchange1.getIn().getMandatoryBody();
+ Bson o = Filters.eq(MONGO_ID, id);
+ Document ret;
+
+ Bson fieldFilter = exchange1.getIn().getHeader(FIELDS_PROJECTION, Bson.class);
+ if (fieldFilter == null) {
+ fieldFilter = new Document();
+ }
+ ret = dbCol.find(o).projection(fieldFilter).first();
+ exchange1.getOut().setHeader(RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+ return ret;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Invalid payload for findById", e);
+ }
+ };
+ }
+
+ private Function<Exchange, Object> createDoSave() {
+ return exchange1 -> {
+ try {
+ MongoCollection<Document> dbCol = calculateCollection(exchange1);
+ Document saveObj = exchange1.getIn().getMandatoryBody(Document.class);
+ UpdateOptions options = new UpdateOptions().upsert(true);
+ UpdateResult result = null;
+ if (null == saveObj.get(MONGO_ID)) {
+ result = dbCol.replaceOne(Filters.where("false"), saveObj, options);
+ exchange1.getIn().setHeader(OID, result.getUpsertedId().asObjectId().getValue());
+ } else {
+ result = dbCol.replaceOne(eq(MONGO_ID, saveObj.get(MONGO_ID)), saveObj, options);
+ exchange1.getIn().setHeader(OID, saveObj.get(MONGO_ID));
+ }
+ return result;
+ } catch (InvalidPayloadException e) {
+ throw new CamelMongoDbException("Body incorrect type for save", e);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
new file mode 100644
index 0000000..4eba81f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingConfig.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+public class MongoDbTailTrackingConfig {
+
+ public static final String DEFAULT_COLLECTION = "camelTailTracking";
+ public static final String DEFAULT_FIELD = "lastTrackingValue";
+
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+ */
+ public final String increasingField;
+ /**
+ * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+ */
+ public final boolean persistent;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackDb(String)}
+ */
+ public final String db;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackCollection(String)}
+ */
+ public final String collection;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackField(String)}
+ */
+ public final String field;
+ /**
+ * See {@link MongoDbEndpoint#setPersistentId(String)}
+ */
+ public final String persistentId;
+
+ public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
+ String tailTrackCollection, String tailTrackField, String persistentId) {
+ this.increasingField = tailTrackIncreasingField;
+ this.persistent = persistentTailTracking;
+ this.db = tailTrackDb;
+ this.persistentId = persistentId;
+ this.collection = tailTrackCollection == null ? DEFAULT_COLLECTION : tailTrackCollection;
+ this.field = tailTrackField == null ? DEFAULT_FIELD : tailTrackField;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
new file mode 100644
index 0000000..e34643d
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailTrackingManager.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Updates;
+
+public class MongoDbTailTrackingManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
+
+ public Object lastVal;
+
+ private final MongoClient connection;
+ private final MongoDbTailTrackingConfig config;
+ private MongoCollection<Document> dbCol;
+ private Document trackingObj;
+
+ public MongoDbTailTrackingManager(MongoClient connection, MongoDbTailTrackingConfig config) {
+ this.connection = connection;
+ this.config = config;
+ }
+
+ public void initialize() throws Exception {
+ if (!config.persistent) {
+ return;
+ }
+
+ dbCol = connection.getDatabase(config.db).getCollection(config.collection, Document.class);
+ Document filter = new Document("persistentId", config.persistentId);
+ trackingObj = dbCol.find(filter).first();
+ if (trackingObj == null) {
+ dbCol.insertOne(filter);
+ trackingObj = dbCol.find(filter).first();
+ }
+ // keep only the _id, the rest is useless and causes more overhead during update
+ trackingObj = new Document(MONGO_ID, trackingObj.get(MONGO_ID));
+ }
+
+ public synchronized void persistToStore() {
+ if (!config.persistent || lastVal == null) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
+ }
+
+ Bson updateObj = Updates.set(config.field, lastVal);
+ dbCol.updateOne(trackingObj, updateObj);
+ trackingObj = dbCol.find().first();
+ }
+
+ public synchronized Object recoverFromStore() {
+ if (!config.persistent) {
+ return null;
+ }
+
+ lastVal = dbCol.find(trackingObj).first().get(config.field);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
+ }
+
+ return lastVal;
+ }
+
+ public void setLastVal(Document dbObj) {
+ if (config.increasingField == null) {
+ return;
+ }
+
+ lastVal = dbObj.get(config.increasingField);
+ }
+
+ public String getIncreasingFieldName() {
+ return config.increasingField;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
new file mode 100644
index 0000000..69c8b7f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailableCursorConsumer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * The MongoDb consumer.
+ */
+public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+ private final MongoDbEndpoint endpoint;
+ private ExecutorService executor;
+ private MongoDbTailingProcess tailingProcess;
+
+ public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (tailingProcess != null) {
+ tailingProcess.stop();
+ }
+ if (executor != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(executor);
+ executor = null;
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
+ MongoDbTailTrackingManager trackingManager = initTailTracking();
+ tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
+ tailingProcess.initializeProcess();
+ executor.execute(tailingProcess);
+ }
+
+ protected MongoDbTailTrackingManager initTailTracking() throws Exception {
+ MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
+ answer.initialize();
+ return answer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
new file mode 100644
index 0000000..f51dd48
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbTailingProcess.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.gt;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.CursorType;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+public class MongoDbTailingProcess implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
+ private static final String CAPPED_KEY = "capped";
+
+ public volatile boolean keepRunning = true;
+ public volatile boolean stopped; // = false
+ private volatile CountDownLatch stoppedLatch;
+
+ private final MongoCollection<Document> dbCol;
+ private final MongoDbEndpoint endpoint;
+ private final MongoDbTailableCursorConsumer consumer;
+
+ // create local, final copies of these variables for increased performance
+ private final long cursorRegenerationDelay;
+ private final boolean cursorRegenerationDelayEnabled;
+
+ private MongoCursor<Document> cursor;
+ private MongoDbTailTrackingManager tailTracking;
+
+ public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer,
+ MongoDbTailTrackingManager tailTrack) {
+ this.endpoint = endpoint;
+ this.consumer = consumer;
+ this.dbCol = endpoint.getMongoCollection();
+ this.tailTracking = tailTrack;
+ this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+ this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+ }
+
+ public MongoCursor<Document> getCursor() {
+ return cursor;
+ }
+
+ /**
+ * Initialise the tailing process, the cursor and if persistent tail
+ * tracking is enabled, recover the cursor from the persisted point. As part
+ * of the initialisation process, the component will validate that the
+ * collection we are targeting is 'capped'.
+ *
+ * @throws Exception
+ */
+ public void initializeProcess() throws Exception {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}",
+ "db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
+ }
+
+ if (!isCollectionCapped()) {
+ throw new CamelMongoDbException(
+ "Tailable cursors are only compatible with capped collections, and collection "
+ + endpoint.getCollection() + " is not capped");
+ }
+ try {
+ // recover the last value from the store if it exists
+ tailTracking.recoverFromStore();
+ cursor = initializeCursor();
+ } catch (Exception e) {
+ throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
+ }
+
+ if (cursor == null) {
+ throw new CamelMongoDbException(
+ "Tailable cursor was not initialized, or cursor returned is dead on arrival");
+ }
+
+ }
+
+ private Boolean isCollectionCapped() {
+ return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+ }
+
+ private Document createCollStatsCommand() {
+ return new Document("collStats", endpoint.getCollection());
+ }
+
+ /**
+ * The heart of the tailing process.
+ */
+ @Override
+ public void run() {
+ stoppedLatch = new CountDownLatch(1);
+ while (keepRunning) {
+ doRun();
+ // if the previous call didn't return because we have stopped
+ // running, then regenerate the cursor
+ if (keepRunning) {
+ cursor.close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal,
+ cursorRegenerationDelay);
+ }
+
+ if (cursorRegenerationDelayEnabled) {
+ try {
+ Thread.sleep(cursorRegenerationDelay);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ cursor = initializeCursor();
+ }
+ }
+
+ stopped = true;
+ stoppedLatch.countDown();
+ }
+
+ protected void stop() throws Exception {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}",
+ "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+ }
+ keepRunning = false;
+ // close the cursor if it's open, so if it is blocked on hasNext() it
+ // will return immediately
+ if (cursor != null) {
+ cursor.close();
+ }
+ awaitStopped();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}",
+ "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
+ }
+ }
+
+ /**
+ * The heart of the tailing process.
+ */
+ private void doRun() {
+ // while the cursor has more values, keepRunning is true and the
+ // cursorId is not 0, which symbolizes that the cursor is dead
+ try {
+ while (cursor.hasNext() && keepRunning) { // cursor.getCursorId() !=
+ // 0 &&
+ Document dbObj = cursor.next();
+ Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get(MONGO_ID));
+ }
+ consumer.getProcessor().process(exchange);
+ } catch (Exception e) {
+ // do nothing
+ }
+ tailTracking.setLastVal(dbObj);
+ }
+ } catch (MongoCursorNotFoundException e) {
+ // we only log the warning if we are not stopping, otherwise it is
+ // expected because the stop() method kills the cursor just in case
+ // it is blocked
+ // waiting for more data to arrive
+ if (keepRunning) {
+ LOG.debug(
+ "Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.",
+ e);
+ }
+ }
+
+ // the loop finished, persist the lastValue just in case we are shutting
+ // down
+ // TODO: perhaps add a functionality to persist every N records
+ tailTracking.persistToStore();
+ }
+
+ // no arguments, will ask DB what the last updated Id was (checking
+ // persistent storage)
+ private MongoCursor<Document> initializeCursor() {
+ Object lastVal = tailTracking.lastVal;
+ // lastVal can be null if we are initializing and there is no
+ // persistence enabled
+ MongoCursor<Document> answer;
+ if (lastVal == null) {
+ answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
+ } else {
+ try (MongoCursor<Document> iterator =
+ dbCol.find(gt(tailTracking.getIncreasingFieldName(), lastVal)).cursorType(CursorType.TailableAwait).iterator();) {
+ answer = iterator;
+ }
+ }
+ return answer;
+ }
+
+ private void awaitStopped() throws InterruptedException {
+ if (!stopped) {
+ LOG.info("Going to wait for stopping");
+ stoppedLatch.await();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
new file mode 100644
index 0000000..920d11a
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/converters/MongoDbBasicConverters.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3.converters;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.util.IOHelper;
+import org.bson.BsonArray;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.codecs.BsonArrayCodec;
+import org.bson.codecs.BsonValueCodecProvider;
+import org.bson.codecs.DecoderContext;
+import org.bson.codecs.DocumentCodec;
+import org.bson.codecs.DocumentCodecProvider;
+import org.bson.codecs.ValueCodecProvider;
+import org.bson.codecs.configuration.CodecRegistries;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.bson.json.JsonReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Converter
+public final class MongoDbBasicConverters {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbBasicConverters.class);
+
+ // Jackson's ObjectMapper is thread-safe, so no need to create a pool nor synchronize access to it
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private MongoDbBasicConverters() {
+ }
+
+ @Converter
+ public static Document fromMapToDocument(Map<String, Object> map) {
+ return new Document(map);
+ }
+
+ @Converter
+ public static Map<String, Object> fromDocumentToMap(Document document) {
+ return document;
+ }
+
+ @Converter
+ public static Document fromStringToDocument(String s) {
+ Document answer = null;
+ try {
+ answer = Document.parse(s);
+ } catch (Exception e) {
+ LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+ }
+
+ return answer;
+ }
+
+ @Converter
+ public static Document fromFileToDocument(File f, Exchange exchange) throws FileNotFoundException {
+ return fromInputStreamToDocument(new FileInputStream(f), exchange);
+ }
+
+ @Converter
+ public static Document fromInputStreamToDocument(InputStream is, Exchange exchange) {
+ Document answer = null;
+ try {
+ byte[] input = IOConverter.toBytes(is);
+
+ if (isBson(input)) {
+ JsonReader reader = new JsonReader(new String(input));
+ DocumentCodec documentReader = new DocumentCodec();
+
+ answer = documentReader.decode(reader, DecoderContext.builder().build());
+ } else {
+ answer = Document.parse(IOConverter.toString(input, exchange));
+ }
+ } catch (Exception e) {
+ LOG.warn("String -> Document conversion selected, but the following exception occurred. Returning null.", e);
+ } finally {
+ // we need to make sure to close the input stream
+ IOHelper.close(is, "InputStream", LOG);
+ }
+ return answer;
+ }
+
+ /**
+ * If the input starts with any number of whitespace characters and then a '{' character, we
+ * assume it is JSON rather than BSON. There are probably no useful BSON blobs that fit this pattern
+ */
+ private static boolean isBson(byte[] input) {
+ int i = 0;
+ while (i < input.length) {
+ if (input[i] == '{') {
+ return false;
+ } else if (!Character.isWhitespace(input[i])) {
+ return true;
+ }
+ }
+ return true;
+ }
+
+ @Converter
+ public static Document fromAnyObjectToDocument(Object value) {
+ Document answer;
+ try {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> m = OBJECT_MAPPER.convertValue(value, Map.class);
+ answer = new Document(m);
+ } catch (Exception e) {
+ LOG.warn("Conversion has fallen back to generic Object -> Document, but unable to convert type {}. Returning null. {}",
+ value.getClass().getCanonicalName(), e.getClass().getCanonicalName() + ": " + e.getMessage());
+ return null;
+ }
+ return answer;
+ }
+
+ @Converter
+ public static List<Bson> fromStringToList(String value) {
+
+ final CodecRegistry codecRegistry = CodecRegistries.fromProviders(Arrays.asList(
+ new ValueCodecProvider(),
+ new BsonValueCodecProvider(),
+ new DocumentCodecProvider()));
+
+ JsonReader reader = new JsonReader(value);
+ BsonArrayCodec arrayReader = new BsonArrayCodec(codecRegistry);
+
+ BsonArray docArray = arrayReader.decode(reader, DecoderContext.builder().build());
+
+ List<Bson> answer = new ArrayList<>(docArray.size());
+
+ for (BsonValue doc : docArray) {
+ answer.add(doc.asDocument());
+ }
+ return answer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
new file mode 100644
index 0000000..40042b5
--- /dev/null
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/processor/idempotent/MongoDbIdempotentRepository.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3.processor.idempotent;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import com.mongodb.ErrorCategory;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.result.DeleteResult;
+
+@ManagedResource(description = "Mongo db based message id repository")
+public class MongoDbIdempotentRepository<E> extends ServiceSupport implements IdempotentRepository<E> {
+ private MongoClient mongoClient;
+ private String collectionName;
+ private String dbName;
+ private MongoCollection<Document> collection;
+
+ public MongoDbIdempotentRepository() {
+ }
+
+ public MongoDbIdempotentRepository(MongoClient mongoClient, String collectionName, String dbName) {
+ this.mongoClient = mongoClient;
+ this.collectionName = collectionName;
+ this.dbName = dbName;
+ this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
+ }
+
+ @ManagedOperation(description = "Adds the key to the store")
+ @Override
+ public boolean add(E key) {
+ Document document = new Document(MONGO_ID, key);
+ try {
+ collection.insertOne(document);
+ } catch (com.mongodb.MongoWriteException ex) {
+ if (ex.getError().getCategory() == ErrorCategory.DUPLICATE_KEY) {
+ return false;
+ }
+ throw ex;
+ }
+ return true;
+ }
+
+ @ManagedOperation(description = "Does the store contain the given key")
+ @Override
+ public boolean contains(E key) {
+ Bson document = eq(MONGO_ID, key);
+ long count = collection.count(document);
+ return count > 0;
+ }
+
+ @ManagedOperation(description = "Remove the key from the store")
+ @Override
+ public boolean remove(E key) {
+ Bson document = eq(MONGO_ID, key);
+ DeleteResult res = collection.deleteOne(document);
+ return res.getDeletedCount() > 0;
+ }
+
+ @Override
+ public boolean confirm(E key) {
+ return true;
+ }
+
+ @ManagedOperation(description = "Clear the store")
+ @Override
+ public void clear() {
+ collection.deleteMany(new Document());
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(mongoClient, "cli");
+ ObjectHelper.notNull(dbName, "dbName");
+ ObjectHelper.notNull(collectionName, "collectionName");
+
+ if (collection == null) {
+ this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ return;
+ }
+
+ public MongoClient getMongoClient() {
+ return mongoClient;
+ }
+
+ public void setMongoClient(MongoClient mongoClient) {
+ this.mongoClient = mongoClient;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public void setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt b/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt b/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+ =========================================================================
+ == NOTICE file corresponding to the section 4 d of ==
+ == the Apache License, Version 2.0, ==
+ == in this case for the Apache Camel distribution. ==
+ =========================================================================
+
+ This product includes software developed by
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Please read the different LICENSE files present in the licenses directory of
+ this distribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..c9afc68
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.mongodb3.converters.MongoDbBasicConverters
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3 b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
new file mode 100644
index 0000000..ba9679f
--- /dev/null
+++ b/components/camel-mongodb3/src/main/resources/META-INF/services/org/apache/camel/component/mongodb3
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.mongodb3.MongoDbComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
new file mode 100644
index 0000000..52bdd6d
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/AbstractMongoDbTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.Formatter;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.component.mongodb3.CamelMongoDbException;
+import org.apache.camel.component.properties.PropertiesComponent;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.bson.Document;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+
+
+public abstract class AbstractMongoDbTest extends CamelTestSupport {
+
+ protected static MongoClient mongo;
+ protected static MongoDatabase db;
+ protected static MongoCollection<Document> testCollection;
+ protected static MongoCollection<Document> dynamicCollection;
+
+ protected static String dbName = "test";
+ protected static String testCollectionName;
+ protected static String dynamicCollectionName;
+
+ protected ApplicationContext applicationContext;
+
+ @Override
+ public void doPostSetup() {
+ mongo = applicationContext.getBean("myDb", MongoClient.class);
+ db = mongo.getDatabase(dbName);
+
+ // Refresh the test collection - drop it and recreate it. We don't do this for the database because MongoDB would create large
+ // store files each time
+ testCollectionName = "camelTest";
+ testCollection = db.getCollection(testCollectionName, Document.class);
+ testCollection.drop();
+ testCollection = db.getCollection(testCollectionName, Document.class);
+
+ dynamicCollectionName = testCollectionName.concat("Dynamic");
+ dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+ dynamicCollection.drop();
+ dynamicCollection = db.getCollection(dynamicCollectionName, Document.class);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ testCollection.drop();
+ dynamicCollection.drop();
+
+ super.tearDown();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ applicationContext = new AnnotationConfigApplicationContext(EmbedMongoConfiguration.class);
+ @SuppressWarnings("deprecation")
+ CamelContext ctx = SpringCamelContext.springCamelContext(applicationContext);
+ PropertiesComponent pc = new PropertiesComponent("classpath:mongodb.test.properties");
+ ctx.addComponent("properties", pc);
+ return ctx;
+ }
+
+ protected void pumpDataIntoTestCollection() {
+ // there should be 100 of each
+ String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
+ for (int i = 1; i <= 1000; i++) {
+ int index = i % scientists.length;
+ Formatter f = new Formatter();
+ String doc = f.format("{\"_id\":\"%d\", \"scientist\":\"%s\", \"fixedField\": \"fixedValue\"}", i, scientists[index]).toString();
+ IOHelper.close(f);
+ testCollection.insertOne(Document.parse(doc));
+ }
+ assertEquals("Data pumping of 1000 entries did not complete entirely", 1000L, testCollection.count());
+ }
+
+ protected CamelMongoDbException extractAndAssertCamelMongoDbException(Object result, String message) {
+ assertTrue("Result is not an Exception", result instanceof Throwable);
+ assertTrue("Result is not an CamelExecutionException", result instanceof CamelExecutionException);
+ Throwable exc = ((CamelExecutionException) result).getCause();
+ assertTrue("Result is not an CamelMongoDbException", exc instanceof CamelMongoDbException);
+ CamelMongoDbException camelExc = ObjectHelper.cast(CamelMongoDbException.class, exc);
+ if (message != null) {
+ assertTrue("CamelMongoDbException doesn't contain desired message string", camelExc.getMessage().contains(message));
+ }
+ return camelExc;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
new file mode 100644
index 0000000..03b34c3
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/EmbedMongoConfiguration.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.MongoClientOptions.builder;
+import static de.flapdoodle.embed.mongo.distribution.Version.Main.PRODUCTION;
+import static de.flapdoodle.embed.process.runtime.Network.localhostIsIPv6;
+import static org.springframework.util.SocketUtils.findAvailableTcpPort;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.ReadPreference;
+import com.mongodb.WriteConcern;
+
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.IMongodConfig;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+
+@Configuration
+public class EmbedMongoConfiguration {
+
+ private static final int PORT = findAvailableTcpPort();
+
+ static {
+ try {
+ IMongodConfig mongodConfig = new MongodConfigBuilder()
+ .version(PRODUCTION)
+ .net(new Net(PORT, localhostIsIPv6()))
+ .build();
+ MongodExecutable mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig);
+ mongodExecutable.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Bean
+ public MongoClient myDb() throws UnknownHostException {
+ return new MongoClient("0.0.0.0", PORT);
+ }
+
+ @Bean
+ public MongoClient myDbP() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primary()));
+ return new MongoClient(uri);
+ }
+
+ @Bean
+ public MongoClient myDbPP() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.primaryPreferred()));
+ return new MongoClient(uri);
+ }
+
+ @Bean
+ public MongoClient myDbS() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondary()));
+ return new MongoClient(uri);
+ }
+
+ @Bean
+ public MongoClient myDbWCA() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().writeConcern(WriteConcern.ACKNOWLEDGED));
+ return new MongoClient(uri);
+ }
+
+ @Bean
+ public MongoClient myDbSP() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.secondaryPreferred()));
+ return new MongoClient(uri);
+ }
+
+ @Bean
+ public MongoClient myDbN() throws UnknownHostException {
+ MongoClientURI uri = new MongoClientURI("mongodb://localhost:"+PORT, builder().readPreference(ReadPreference.nearest()));
+ return new MongoClient(uri);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
new file mode 100644
index 0000000..33b7eeb
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoBasicOperationsConfiguration.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.ImportResource;
+
+@Configuration
+@Import(EmbedMongoConfiguration.class)
+@ImportResource("org/apache/camel/component/mongodb3/mongoBasicOperationsTest.xml")
+public class MongoBasicOperationsConfiguration {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/7bd7750e/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
new file mode 100644
index 0000000..abb265c
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbConversionsTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import static com.mongodb.client.model.Filters.eq;
+import static org.apache.camel.component.mongodb3.MongoDbConstants.MONGO_ID;
+
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.IOConverter;
+import org.bson.Document;
+import org.junit.Test;
+
+public class MongoDbConversionsTest extends AbstractMongoDbTest {
+
+ @Test
+ public void testInsertMap() throws InterruptedException {
+ assertEquals(0, testCollection.count());
+
+ Map<String, Object> m1 = new HashMap<>();
+ Map<String, String> m1Nested = new HashMap<>();
+
+ m1Nested.put("nested1", "nestedValue1");
+ m1Nested.put("nested2", "nestedValue2");
+
+ m1.put("field1", "value1");
+ m1.put("field2", "value2");
+ m1.put("nestedField", m1Nested);
+ m1.put(MONGO_ID, "testInsertMap");
+
+// Object result =
+ template.requestBody("direct:insertMap", m1);
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertMap")).first();
+ assertNotNull("No record with 'testInsertMap' _id", b);
+
+ }
+
+ @Test
+ public void testInsertPojo() {
+ assertEquals(0, testCollection.count());
+// Object result =
+ template.requestBody("direct:insertPojo", new MyPojoTest());
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertPojo")).first();
+ assertNotNull("No record with 'testInsertPojo' _id", b);
+ }
+
+ @Test
+ public void testInsertJsonString() {
+ assertEquals(0, testCollection.count());
+// Object result =
+ template.requestBody("direct:insertJsonString", "{\"fruits\": [\"apple\", \"banana\", \"papaya\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}");
+ //assertTrue(result instanceof WriteResult);
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+ assertNotNull("No record with 'testInsertJsonString' _id", b);
+ }
+
+ @Test
+ public void testInsertJsonInputStream() throws Exception {
+ assertEquals(0, testCollection.count());
+// Object result =
+ template.requestBody("direct:insertJsonString",
+ IOConverter.toInputStream("{\"fruits\": [\"apple\", \"banana\"], \"veggie\": \"broccoli\", \"_id\": \"testInsertJsonString\"}\n", null));
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertJsonString")).first();
+ assertNotNull("No record with 'testInsertJsonString' _id", b);
+ }
+
+ @Test
+ public void testInsertBsonInputStream() {
+ assertEquals(0, testCollection.count());
+
+ Document document = new Document(MONGO_ID, "testInsertBsonString");
+
+// Object result =
+ template.requestBody("direct:insertJsonString", new ByteArrayInputStream(document.toJson().getBytes()));
+ Document b = testCollection.find(eq(MONGO_ID, "testInsertBsonString")).first();
+ assertNotNull("No record with 'testInsertBsonString' _id", b);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("direct:insertMap").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertPojo").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertJsonString").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
+ from("direct:insertJsonStringWriteResultInString").to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert").convertBodyTo(String.class);
+
+ }
+ };
+ }
+
+ @SuppressWarnings("unused")
+ private class MyPojoTest {
+ public int number = 123;
+ public String text = "hello";
+ public String[] array = {"daVinci", "copernico", "einstein"};
+ // CHECKSTYLE:OFF
+ public String _id = "testInsertPojo";
+ // CHECKSTYLE:ON
+ }
+
+}