You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 18:33:50 UTC
[2/3] git commit: CAMEL-6000: Add collectionIndex option to
camel-mongodb. Thanks to Pierre-Yves Nicolas for the patch.
CAMEL-6000: Add collectionIndex option to camel-mongodb. Thanks to Pierre-Yves Nicolas for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5f42992
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5f42992
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5f42992
Branch: refs/heads/master
Commit: d5f429924c16418a9ceb672d40f01c2743b48af5
Parents: 9ecc122
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 16:43:24 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 18:33:33 2013 +0200
----------------------------------------------------------------------
.../component/mongodb/MongoDbConstants.java | 10 +-
.../component/mongodb/MongoDbEndpoint.java | 280 +++++++++++++------
.../component/mongodb/MongoDbProducer.java | 146 +++++-----
.../component/mongodb/MongoDbIndexTest.java | 193 +++++++++++++
4 files changed, 476 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
index 784a4e9..3eff7c5 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
@@ -17,8 +17,8 @@
package org.apache.camel.component.mongodb;
public final class MongoDbConstants {
-
- public static final String OPERATION_HEADER = "CamelMongoDbOperation";
+
+ public static final String OPERATION_HEADER = "CamelMongoDbOperation";
public static final String RESULT_TOTAL_SIZE = "CamelMongoDbResultTotalSize";
public static final String RESULT_PAGE_SIZE = "CamelMongoDbResultPageSize";
public static final String FIELDS_FILTER = "CamelMongoDbFieldsFilter";
@@ -32,11 +32,13 @@ public final class MongoDbConstants {
public static final String SORT_BY = "CamelMongoDbSortBy";
public static final String DATABASE = "CamelMongoDbDatabase";
public static final String COLLECTION = "CamelMongoDbCollection";
+ public static final String COLLECTION_INDEX = "CamelMongoDbCollectionIndex";
public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
public static final String LIMIT = "CamelMongoDbLimit";
public static final String FROM_TAILABLE = "CamelMongoDbTailable";
public static final String WRITERESULT = "CamelMongoWriteResult";
- private MongoDbConstants() { }
-
+ private MongoDbConstants() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 97d410e..c216ccb 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -16,6 +16,13 @@
*/
package org.apache.camel.component.mongodb;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
@@ -23,7 +30,6 @@ import com.mongodb.Mongo;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
-
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -33,14 +39,14 @@ import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Represents a MongoDb endpoint.
- * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
- * It accepts a number of options to customise the behaviour of consumers and producers.
+ * Represents a MongoDb endpoint. It is responsible for creating
+ * {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
+ * It accepts a number of options to customise the behaviour of consumers and
+ * producers.
*/
public class MongoDbEndpoint extends DefaultEndpoint {
@@ -48,6 +54,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
private Mongo mongoConnection;
private String database;
private String collection;
+ private String collectionIndex;
private MongoDbOperation operation;
private boolean createCollection = true;
private boolean invokeGetLastError; // = false
@@ -60,16 +67,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
private MongoDbConsumerType consumerType;
private long cursorRegenerationDelay = 1000L;
private String tailTrackIncreasingField;
-
+
// persitent tail tracking
private boolean persistentTailTracking; // = false;
private String persistentId;
private String tailTrackDb;
private String tailTrackCollection;
private String tailTrackField;
-
+
private MongoDbTailTrackingConfig tailTrackingConfig;
-
+
private DBCollection dbCollection;
private DB db;
@@ -88,7 +95,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
// ======= Implementation methods =====================================
-
+
public Producer createProducer() throws Exception {
validateOptions('P');
initializeConnection();
@@ -100,12 +107,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
// we never create the collection
createCollection = false;
initializeConnection();
-
+
// select right consumer type
if (consumerType == null) {
consumerType = MongoDbConsumerType.tailable;
}
-
+
Consumer consumer;
if (consumerType == MongoDbConsumerType.tailable) {
consumer = new MongoDbTailableCursorConsumer(this, processor);
@@ -121,17 +128,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
// make our best effort to validate, options with defaults are checked against their defaults, which is not always a guarantee that
// they haven't been explicitly set, but it is enough
if (role == 'P') {
- if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb)
+ if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb)
|| !ObjectHelper.isEmpty(tailTrackCollection) || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) {
throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
}
} else if (role == 'C') {
- if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null
+ if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null
|| readPreference != null || dynamicity || invokeGetLastError) {
- throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError "
+ throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError "
+ "options cannot appear on a consumer endpoint");
}
-
+
if (consumerType == MongoDbConsumerType.tailable) {
if (tailTrackIncreasingField == null) {
throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint");
@@ -140,7 +147,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
}
}
-
+
} else {
throw new IllegalArgumentException("Unknown endpoint role");
}
@@ -149,9 +156,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public boolean isSingleton() {
return true;
}
-
+
/**
- * Initialises the MongoDB connection using the Mongo object provided to the endpoint
+ * Initialises the MongoDB connection using the Mongo object provided to the
+ * endpoint
+ *
* @throws CamelMongoDbException
*/
public void initializeConnection() throws CamelMongoDbException {
@@ -167,20 +176,66 @@ public class MongoDbEndpoint extends DefaultEndpoint {
throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
}
dbCollection = db.getCollection(collection);
-
- LOG.info("MongoDb component initialised and endpoint bound to MongoDB collection with the following paramters. Address list: {}, Db: {}, Collection: {}",
- new Object[] {mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+
+ LOG.debug("MongoDb component initialised and endpoint bound to MongoDB collection with the following parameters. Address list: {}, Db: {}, Collection: {}",
+ new Object[]{mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+
+ try {
+ if (ObjectHelper.isNotEmpty(collectionIndex)) {
+ ensureIndex(dbCollection, createIndex());
+ }
+ } catch (Exception e) {
+ throw new CamelMongoDbException("Error creating index", e);
+ }
+ }
+
+ /**
+ * Add Index
+ *
+ * @param collection
+ */
+ public void ensureIndex(DBCollection collection, List<DBObject> dynamicIndex) {
+ collection.dropIndexes();
+ if (dynamicIndex != null && !dynamicIndex.isEmpty()) {
+ for (DBObject index : dynamicIndex) {
+ LOG.debug("create BDObject Index {}", index);
+ collection.ensureIndex(index);
+ }
+ }
+ }
+
+ /**
+ * Create technical list index
+ *
+ * @return technical list index
+ */
+ @SuppressWarnings("unchecked")
+ public List<DBObject> createIndex() throws Exception {
+ List<DBObject> indexList = new ArrayList<DBObject>();
+
+ if (ObjectHelper.isNotEmpty(collectionIndex)) {
+ HashMap<String, String> indexMap = new ObjectMapper().readValue(collectionIndex, HashMap.class);
+
+ for (Map.Entry<String, String> set : indexMap.entrySet()) {
+ DBObject index = new BasicDBObject();
+ index.put(set.getKey(), set.getValue());
+
+ indexList.add(index);
+ }
+ }
+ return indexList;
}
/**
- * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization
+ * Applies validation logic specific to this endpoint type. If everything
+ * succeeds, continues initialization
*/
@Override
protected void doStart() throws Exception {
if (writeConcern != null && writeConcernRef != null) {
- LOG.error("Cannot set both writeConcern and writeConcernRef at the same time. Respective values: {}, {}. "
- + "Aborting initialization.", new Object[] {writeConcern, writeConcernRef});
- throw new IllegalArgumentException("Cannot set both writeConcern and writeConcernRef at the same time on MongoDB endpoint");
+ String msg = "Cannot set both writeConcern and writeConcernRef at the same time. Respective values: " + writeConcern
+ + ", " + writeConcernRef + ". Aborting initialization.";
+ throw new IllegalArgumentException(msg);
}
setWriteReadOptionsOnConnection();
@@ -193,12 +248,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
message.setHeader(MongoDbConstants.DATABASE, database);
message.setHeader(MongoDbConstants.COLLECTION, collection);
message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
-
+
message.setBody(dbObj);
exchange.setIn(message);
return exchange;
}
-
+
private void setWriteReadOptionsOnConnection() {
// Set the WriteConcern
if (writeConcern != null) {
@@ -206,18 +261,19 @@ public class MongoDbEndpoint extends DefaultEndpoint {
} else if (writeConcernRef != null) {
mongoConnection.setWriteConcern(writeConcernRef);
}
-
+
// Set the ReadPreference
if (readPreference != null) {
mongoConnection.setReadPreference(readPreference);
}
}
-
-
- // ======= Getters and setters ===============================================
-
+
+ // ======= Getters and setters
+ // ===============================================
+
/**
* Sets the name of the MongoDB collection to bind to this endpoint
+ *
* @param collection collection name
*/
public void setCollection(String collection) {
@@ -229,7 +285,21 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}.
+ * Sets the collection index (JSON FORMAT : { "field1" : "order", "field2" :
+ * "order"})
+ */
+ public void setCollectionIndex(String collectionIndex) {
+ this.collectionIndex = collectionIndex;
+ }
+
+ public String getCollectionIndex() {
+ return collectionIndex;
+ }
+
+ /**
+ * Sets the operation this endpoint will execute against MongoDB. For
+ * possible values, see {@link MongoDbOperation}.
+ *
* @param operation name of the operation as per catalogued values
* @throws CamelMongoDbException
*/
@@ -247,6 +317,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
/**
* Sets the name of the MongoDB database to target
+ *
* @param database name of the MongoDB database
*/
public void setDatabase(String database) {
@@ -258,7 +329,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Create collection during initialisation if it doesn't exist. Default is true.
+ * Create collection during initialisation if it doesn't exist. Default is
+ * true.
+ *
* @param createCollection true or false
*/
public void setCreateCollection(boolean createCollection) {
@@ -276,9 +349,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public DBCollection getDbCollection() {
return dbCollection;
}
-
+
/**
* Sets the Mongo instance that represents the backing connection
+ *
* @param mongoConnection the connection to the database
*/
public void setMongoConnection(Mongo mongoConnection) {
@@ -290,10 +364,14 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
- * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method.
+ * Set the {@link WriteConcern} for write operations on MongoDB using the
+ * standard ones. Resolved from the fields of the WriteConcern class by
+ * calling the {@link WriteConcern#valueOf(String)} method.
+ *
* @param writeConcern the standard name of the WriteConcern
- * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a>
+ * @see <a
+ * href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible
+ * options</a>
*/
public void setWriteConcern(String writeConcern) {
this.writeConcern = WriteConcern.valueOf(writeConcern);
@@ -304,9 +382,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait
- * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation
- * has actually occurred in MongoDB.
+ * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with
+ * every operation. By default, MongoDB does not wait for the write
+ * operation to occur before returning. If set to true, each exchange will
+ * only return after the write operation has actually occurred in MongoDB.
+ *
* @param invokeGetLastError true or false
*/
public void setInvokeGetLastError(boolean invokeGetLastError) {
@@ -320,16 +400,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
/**
* Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry.
* You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method.
+ *
* @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use
*/
public void setWriteConcernRef(String writeConcernRef) {
WriteConcern wc = this.getCamelContext().getRegistry().lookupByNameAndType(writeConcernRef, WriteConcern.class);
if (wc == null) {
- LOG.error("Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the "
- + "provided bean name ({}) is correct. Aborting initialization.", writeConcernRef);
- throw new IllegalArgumentException("Camel MongoDB component could not find the WriteConcern in the Registry");
+ String msg = "Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the "
+ + "provided bean name (" + writeConcernRef + ") is correct. Aborting initialization.";
+ throw new IllegalArgumentException(msg);
}
-
+
this.writeConcernRef = wc;
}
@@ -337,9 +418,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
return writeConcernRef;
}
- /**
- * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
- * overridden by this setting.
+ /**
+ * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read
+ * preferences set directly on the connection will be overridden by this
+ * setting.
+ *
* @param readPreference the bean name of the read preference to set
*/
public void setReadPreference(String readPreference) {
@@ -354,23 +437,28 @@ public class MongoDbEndpoint extends DefaultEndpoint {
break;
}
}
-
- LOG.error("Could not resolve specified ReadPreference of type {}. Read preferences are resolved from inner "
- + "classes of com.mongodb.ReadPreference.", readPreference);
- throw new IllegalArgumentException("MongoDB endpoint could not resolve specified ReadPreference");
+
+ String msg = "Could not resolve specified ReadPreference of type " + readPreference
+ + ". Read preferences are resolved from inner classes of com.mongodb.ReadPreference.";
+ throw new IllegalArgumentException(msg);
}
-
+
public ReadPreference getReadPreference() {
return readPreference;
}
/**
- * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties.
- * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI.
- * It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
+ * Sets whether this endpoint will attempt to dynamically resolve the target
+ * database and collection from the incoming Exchange properties. Can be
+ * used to override at runtime the database and collection specified on the
+ * otherwise static endpoint URI. It is disabled by default to boost
+ * performance. Enabling it will take a minimal performance hit.
+ *
+ * @param dynamicity true or false indicated whether target database and
+ * collection should be calculated dynamically based on Exchange
+ * properties.
* @see MongoDbConstants#DATABASE
* @see MongoDbConstants#COLLECTION
- * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties.
*/
public void setDynamicity(boolean dynamicity) {
this.dynamicity = dynamicity;
@@ -381,7 +469,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Reserved for future use, when more consumer types are supported.
+ * Reserved for future use, when more consumer types are supported.
+ *
* @param consumerType key of the consumer type
* @throws CamelMongoDbException
*/
@@ -396,15 +485,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public MongoDbConsumerType getConsumerType() {
return consumerType;
}
-
+
public String getTailTrackDb() {
return tailTrackDb;
}
/**
- * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will
- * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database
- * will not vary past endpoint initialisation.
+ * Indicates what database the tail tracking mechanism will persist to. If
+ * not specified, the current database will be picked by default. Dynamicity
+ * will not be taken into account even if enabled, i.e. the tail tracking
+ * database will not vary past endpoint initialisation.
+ *
* @param tailTrackDb database name
*/
public void setTailTrackDb(String tailTrackDb) {
@@ -416,8 +507,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION}
- * will be used by default.
+ * Collection where tail tracking information will be persisted. If not
+ * specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} will be
+ * used by default.
+ *
* @param tailTrackCollection collection name
*/
public void setTailTrackCollection(String tailTrackCollection) {
@@ -429,8 +522,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Field where the last tracked value will be placed. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_FIELD}
- * will be used by default.
+ * Field where the last tracked value will be placed. If not specified,
+ * {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} will be used by default.
+ *
* @param tailTrackField field name
*/
public void setTailTrackField(String tailTrackField) {
@@ -438,8 +532,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts.
- * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records.
+ * Enable persistent tail tracking, which is a mechanism to keep track of
+ * the last consumed message across system restarts. The next time the
+ * system is up, the endpoint will recover the cursor from the point where
+ * it last stopped slurping records.
+ *
* @param persistentTailTracking true or false
*/
public void setPersistentTailTracking(boolean persistentTailTracking) {
@@ -449,14 +546,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public boolean isPersistentTailTracking() {
return persistentTailTracking;
}
-
+
/**
- * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every
- * time it is generated.
- * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent
- * tail tracking).
- * Can be of type Integer, Date, String, etc.
- * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document.
+ * Correlation field in the incoming record which is of increasing nature
+ * and will be used to position the tailing cursor every time it is
+ * generated. The cursor will be (re)created with a query of type:
+ * tailTrackIncreasingField > lastValue (possibly recovered from persistent
+ * tail tracking). Can be of type Integer, Date, String, etc. NOTE: No
+ * support for dot notation at the current time, so the field should be at
+ * the top level of the document.
+ *
* @param tailTrackIncreasingField
*/
public void setTailTrackIncreasingField(String tailTrackIncreasingField) {
@@ -469,16 +568,20 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public MongoDbTailTrackingConfig getTailTrackingConfig() {
if (tailTrackingConfig == null) {
- tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField,
- tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, tailTrackField, getPersistentId());
+ tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, tailTrackDb == null ? database : tailTrackDb, tailTrackCollection,
+ tailTrackField, getPersistentId());
}
- return tailTrackingConfig;
+ return tailTrackingConfig;
}
/**
- * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically
- * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait
- * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms.
+ * MongoDB tailable cursors will block until new data arrives. If no new
+ * data is inserted, after some time the cursor will be automatically freed
+ * and closed by the MongoDB server. The client is expected to regenerate
+ * the cursor if needed. This value specifies the time to wait before
+ * attempting to fetch a new cursor, and if the attempt fails, how long
+ * before the next attempt is made. Default value is 1000ms.
+ *
* @param cursorRegenerationDelay delay specified in milliseconds
*/
public void setCursorRegenerationDelay(long cursorRegenerationDelay) {
@@ -490,9 +593,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * One tail tracking collection can host many trackers for several tailable consumers.
- * To keep them separate, each tracker should have its own unique persistentId.
- * @param persistentId the value of the persistent ID to use for this tailable consumer
+ * One tail tracking collection can host many trackers for several tailable
+ * consumers. To keep them separate, each tracker should have its own unique
+ * persistentId.
+ *
+ * @param persistentId the value of the persistent ID to use for this
+ * tailable consumer
*/
public void setPersistentId(String persistentId) {
this.persistentId = persistentId;
@@ -507,8 +613,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
}
/**
- * In write operations, it determines whether instead of returning {@link WriteResult} as the body of the OUT
- * message, we transfer the IN message to the OUT and attach the WriteResult as a header.
+ * In write operations, it determines whether instead of returning
+ * {@link WriteResult} as the body of the OUT message, we transfer the IN
+ * message to the OUT and attach the WriteResult as a header.
+ *
* @param writeResultAsHeader flag to indicate if this option is enabled
*/
public void setWriteResultAsHeader(boolean writeResultAsHeader) {
http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
index 1bf5a45..c18e696 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
@@ -67,17 +67,19 @@ public class MongoDbProducer extends DefaultProducer {
throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
}
}
-
+
try {
invokeOperation(operation, exchange);
} catch (Exception e) {
throw MongoDbComponent.wrapInCamelMongoDbException(e);
}
-
+
}
/**
- * Entry method that selects the appropriate MongoDB operation and executes it
+ * Entry method that selects the appropriate MongoDB operation and executes
+ * it
+ *
* @param operation
* @param exchange
* @throws Exception
@@ -87,7 +89,7 @@ public class MongoDbProducer extends DefaultProducer {
case count:
doCount(exchange);
break;
-
+
case findOneByQuery:
doFindOneByQuery(exchange);
break;
@@ -134,14 +136,15 @@ public class MongoDbProducer extends DefaultProducer {
}
// ----------- MongoDB operations ----------------
-
+
protected void doGetStats(Exchange exchange, MongoDbOperation operation) throws Exception {
DBObject result = null;
-
+
if (operation == MongoDbOperation.getColStats) {
result = calculateCollection(exchange).getStats();
} else if (operation == MongoDbOperation.getDbStats) {
- // if it's a DB, also take into account the dynamicity option and the DB that is used
+ // if it's a DB, also take into account the dynamicity option and
+ // the DB that is used
result = calculateCollection(exchange).getDB().getStats();
} else {
throw new CamelMongoDbException("Internal error: wrong operation for getStats variant" + operation);
@@ -154,13 +157,15 @@ public class MongoDbProducer extends DefaultProducer {
protected void doRemove(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
DBObject removeObj = exchange.getIn().getMandatoryBody(DBObject.class);
-
+
WriteConcern wc = extractWriteConcern(exchange);
WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc);
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.remove);
- // we always return the WriteResult, because whether the getLastError was called or not,
- // the user will have the means to call it or obtain the cached CommandResult
+ // we always return the WriteResult, because whether the getLastError
+ // was called or not,
+ // the user will have the means to call it or obtain the cached
+ // CommandResult
processAndTransferWriteResult(result, exchange);
resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
}
@@ -168,96 +173,106 @@ public class MongoDbProducer extends DefaultProducer {
@SuppressWarnings("unchecked")
protected void doUpdate(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
- List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>) (Class<?>) List.class);
+ List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>)(Class<?>)List.class);
if (saveObj.size() != 2) {
throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2");
}
-
+
DBObject updateCriteria = saveObj.get(0);
DBObject objNew = saveObj.get(1);
-
+
Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class);
Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class);
-
+
WriteResult result;
WriteConcern wc = extractWriteConcern(exchange);
- // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the
- // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future,
+ // In API 2.7, the default upsert and multi values of update(DBObject,
+ // DBObject) are false, false, so we unconditionally invoke the
+ // full-signature method update(DBObject, DBObject, boolean, boolean).
+ // However, the default behaviour may change in the future,
// so it's safer to be explicit at this level for full determinism
if (multi == null && upsert == null) {
- // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args,
+ // for update with no multi nor upsert but with specific
+ // WriteConcern there is no update signature without multi and
+ // upsert args,
// so assume defaults
result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc);
} else {
- // we calculate the final boolean values so that if any of these parameters is null, it is resolved to false
- result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi))
- : dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
+ // we calculate the final boolean values so that if any of these
+ // parameters is null, it is resolved to false
+ result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) : dbCol
+ .update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
}
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.update);
- // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // we always return the WriteResult, because whether the getLastError
+ // was called or not, the user will have the means to call it or
// obtain the cached CommandResult
processAndTransferWriteResult(result, exchange);
resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
}
-
+
protected void doSave(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
DBObject saveObj = exchange.getIn().getMandatoryBody(DBObject.class);
-
+
WriteConcern wc = extractWriteConcern(exchange);
WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc);
-
+
prepareResponseMessage(exchange, MongoDbOperation.save);
- // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // we always return the WriteResult, because whether the getLastError
+ // was called or not, the user will have the means to call it or
// obtain the cached CommandResult
processAndTransferWriteResult(result, exchange);
}
-
+
protected void doFindById(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
Object o = exchange.getIn().getMandatoryBody();
DBObject ret;
-
+
DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
if (fieldFilter == null) {
ret = dbCol.findOne(o);
} else {
ret = dbCol.findOne(o, fieldFilter);
}
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.save);
resultMessage.setBody(ret);
resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({"rawtypes", "unchecked"})
protected void doInsert(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
boolean singleInsert = true;
Object insert = exchange.getIn().getBody(DBObject.class);
- // body could not be converted to DBObject, check to see if it's of type List<DBObject>
+ // body could not be converted to DBObject, check to see if it's of type
+ // List<DBObject>
if (insert == null) {
insert = exchange.getIn().getBody(List.class);
- // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject>
+ // if the body of type List was obtained, ensure that all items are
+ // of type DBObject and cast the List to List<DBObject>
if (insert != null) {
singleInsert = false;
- insert = attemptConvertToList((List) insert, exchange);
+ insert = attemptConvertToList((List)insert, exchange);
} else {
throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type DBObject nor List<DBObject>");
}
}
-
+
WriteResult result;
WriteConcern wc = extractWriteConcern(exchange);
if (singleInsert) {
- result = wc == null ? dbCol.insert((DBObject) insert) : dbCol.insert((DBObject) insert, wc);
+ result = wc == null ? dbCol.insert((DBObject)insert) : dbCol.insert((DBObject)insert, wc);
} else {
- result = wc == null ? dbCol.insert((List<DBObject>) insert) : dbCol.insert((List<DBObject>) insert, wc);
+ result = wc == null ? dbCol.insert((List<DBObject>)insert) : dbCol.insert((List<DBObject>)insert, wc);
}
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.insert);
- // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // we always return the WriteResult, because whether the getLastError
+ // was called or not, the user will have the means to call it or
// obtain the cached CommandResult
processAndTransferWriteResult(result, exchange);
resultMessage.setBody(result);
@@ -265,21 +280,23 @@ public class MongoDbProducer extends DefaultProducer {
protected void doFindAll(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
- // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+ // do not use getMandatoryBody, because if the body is empty we want to
+ // retrieve all objects in the collection
DBObject query = null;
- // do not run around looking for a type converter unless there is a need for it
+ // do not run around looking for a type converter unless there is a need
+ // for it
if (exchange.getIn().getBody() != null) {
query = exchange.getIn().getBody(DBObject.class);
}
DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
-
+
// get the batch size and number to skip
Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class);
Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class);
DBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, DBObject.class);
DBCursor ret = null;
- try {
+ try {
if (query == null && fieldFilter == null) {
ret = dbCol.find(new BasicDBObject());
} else if (fieldFilter == null) {
@@ -287,28 +304,28 @@ public class MongoDbProducer extends DefaultProducer {
} else {
ret = dbCol.find(query, fieldFilter);
}
-
+
if (sortBy != null) {
ret.sort(sortBy);
}
-
+
if (batchSize != null) {
ret.batchSize(batchSize.intValue());
}
-
+
if (numToSkip != null) {
ret.skip(numToSkip.intValue());
}
-
+
if (limit != null) {
ret.limit(limit.intValue());
}
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findAll);
resultMessage.setBody(ret.toArray());
resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
resultMessage.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
-
+
} catch (Exception e) {
// rethrow the exception
throw e;
@@ -318,7 +335,7 @@ public class MongoDbProducer extends DefaultProducer {
ret.close();
}
}
-
+
}
protected void doFindOneByQuery(Exchange exchange) throws Exception {
@@ -332,7 +349,7 @@ public class MongoDbProducer extends DefaultProducer {
} else {
ret = dbCol.findOne(o, fieldFilter);
}
-
+
Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findOneByQuery);
resultMessage.setBody(ret);
resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
@@ -405,7 +422,9 @@ public class MongoDbProducer extends DefaultProducer {
}
dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection);
- LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+ }
return dbCol;
}
@@ -423,18 +442,19 @@ public class MongoDbProducer extends DefaultProducer {
exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
}
}
-
- // determine where to set the WriteResult: as the OUT body or as an IN message header
+
+ // determine where to set the WriteResult: as the OUT body or as an IN
+ // message header
if (endpoint.isWriteResultAsHeader()) {
exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result);
} else {
exchange.getOut().setBody(result);
}
}
-
+
private WriteConcern extractWriteConcern(Exchange exchange) throws CamelMongoDbException {
Object o = exchange.getIn().getHeader(MongoDbConstants.WRITECONCERN);
-
+
if (o == null) {
return null;
} else if (o instanceof WriteConcern) {
@@ -442,16 +462,16 @@ public class MongoDbProducer extends DefaultProducer {
} else if (o instanceof String) {
WriteConcern answer = WriteConcern.valueOf(ObjectHelper.cast(String.class, o));
if (answer == null) {
- throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN
- + " header, with value " + o + " could not be resolved to a WriteConcern type");
+ throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN + " header, with value " + o
+ + " could not be resolved to a WriteConcern type");
}
}
-
+
// should never get here
LOG.warn("A problem occurred while resolving the Exchange's Write Concern");
return null;
}
-
+
@SuppressWarnings("rawtypes")
private List<DBObject> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
List<DBObject> dbObjectList = new ArrayList<DBObject>(insertList.size());
@@ -466,7 +486,7 @@ public class MongoDbProducer extends DefaultProducer {
}
return dbObjectList;
}
-
+
private Message prepareResponseMessage(Exchange exchange, MongoDbOperation operation) {
Message answer = exchange.getOut();
MessageHelper.copyHeaders(exchange.getIn(), answer, false);
@@ -475,9 +495,9 @@ public class MongoDbProducer extends DefaultProducer {
}
return answer;
}
-
+
private boolean isWriteOperation(MongoDbOperation operation) {
return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
new file mode 100644
index 0000000..25468f5
--- /dev/null
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.WriteResult;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MongoDbIndexTest extends AbstractMongoDbTest {
+
+ @Test
+ public void testInsertDynamicityEnabledDBAndCollectionAndIndex() {
+ assertEquals(0, testCollection.count());
+ mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\", \"b\" : \"2\"}";
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(MongoDbConstants.DATABASE, "otherDB");
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ List<DBObject> objIndex = new ArrayList<DBObject>();
+ DBObject index1 = new BasicDBObject();
+ index1.put("a", 1);
+ DBObject index2 = new BasicDBObject();
+ index2.put("b", -1);
+ objIndex.add(index1);
+ objIndex.add(index2);
+ headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+ DBCollection dynamicCollection = mongo.getDB("otherDB").getCollection("otherCollection");
+
+ List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+ BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+ BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+ assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+ assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
+
+ DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledDBAndCollection");
+ assertNotNull("No record with 'testInsertDynamicityEnabledDBAndCollection' _id", b);
+
+ b = testCollection.findOne("testInsertDynamicityEnabledDBOnly");
+ assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b);
+
+ assertTrue("The otherDB database should exist", mongo.getDatabaseNames().contains("otherDB"));
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledCollectionAndIndex() {
+ assertEquals(0, testCollection.count());
+ mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ List<DBObject> objIndex = new ArrayList<DBObject>();
+ DBObject index1 = new BasicDBObject();
+ index1.put("a", 1);
+ DBObject index2 = new BasicDBObject();
+ index2.put("b", -1);
+ objIndex.add(index1);
+ objIndex.add(index2);
+ headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+ assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+ DBCollection dynamicCollection = db.getCollection("otherCollection");
+
+ List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+ BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+ BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+ assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+ assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
+
+ DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionAndIndex");
+ assertNotNull("No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id", b);
+
+ b = testCollection.findOne("testInsertDynamicityEnabledDBOnly");
+ assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b);
+
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+ }
+
+ @Test
+ public void testInsertDynamicityEnabledCollectionOnlyAndURIIndex() {
+ assertEquals(0, testCollection.count());
+ mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+ String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityEnabledWithIndexUri", body, headers);
+
+ assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+ DBCollection dynamicCollection = db.getCollection("otherCollection");
+ List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+ BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+
+ assertFalse("No index on the field a", key1.containsField("a") && "-1".equals(key1.getString("a")));
+
+ DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex");
+ assertNotNull("No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id", b);
+
+ b = testCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex");
+ assertNull("There is a record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id in the test collection", b);
+
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+ }
+
+ @Test
+ public void testInsertAutoCreateCollectionAndURIIndex() {
+ assertEquals(0, testCollection.count());
+ db.getCollection("otherCollection").remove(new BasicDBObject());
+
+ String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers);
+ assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+ DBCollection collection = db.getCollection("otherCollection");
+ List<DBObject> indexInfos = collection.getIndexInfo();
+
+ BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+ BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+ assertTrue("No index on the field b", key1.containsField("b") && "-1".equals(key1.getString("b")));
+ assertTrue("No index on the field a", key2.containsField("a") && "1".equals(key2.getString("a")));
+
+ DBObject b = collection.findOne("testInsertAutoCreateCollectionAndURIIndex");
+ assertNotNull("No record with 'testInsertAutoCreateCollectionAndURIIndex' _id", b);
+
+ b = testCollection.findOne("testInsertAutoCreateCollectionAndURIIndex");
+ assertNull("There is a record with 'testInsertAutoCreateCollectionAndURIIndex' _id in the test collection", b);
+
+ assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:dynamicityEnabled")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE");
+ from("direct:dynamicityEnabledWithIndexUri")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":\"1\"}&operation=insert&dynamicity=true&writeConcern=SAFE");
+ from("direct:dynamicityDisabled")
+ .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":\"1\",\"b\":\"-1\"}&operation=insert&dynamicity=false&writeConcern=SAFE");
+ }
+ };
+ }
+}