You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 18:33:50 UTC

[2/3] git commit: CAMEL-6000: Add collectionIndex option to camel-mongodb. Thanks to Pierre-Yves Nicolas for the patch.

CAMEL-6000: Add collectionIndex option to camel-mongodb. Thanks to Pierre-Yves Nicolas for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5f42992
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5f42992
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5f42992

Branch: refs/heads/master
Commit: d5f429924c16418a9ceb672d40f01c2743b48af5
Parents: 9ecc122
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 16:43:24 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 18:33:33 2013 +0200

----------------------------------------------------------------------
 .../component/mongodb/MongoDbConstants.java     |  10 +-
 .../component/mongodb/MongoDbEndpoint.java      | 280 +++++++++++++------
 .../component/mongodb/MongoDbProducer.java      | 146 +++++-----
 .../component/mongodb/MongoDbIndexTest.java     | 193 +++++++++++++
 4 files changed, 476 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
index 784a4e9..3eff7c5 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
@@ -17,8 +17,8 @@
 package org.apache.camel.component.mongodb;
 
 public final class MongoDbConstants {
-    
-    public static final String OPERATION_HEADER = "CamelMongoDbOperation";    
+
+    public static final String OPERATION_HEADER = "CamelMongoDbOperation";
     public static final String RESULT_TOTAL_SIZE = "CamelMongoDbResultTotalSize";
     public static final String RESULT_PAGE_SIZE = "CamelMongoDbResultPageSize";
     public static final String FIELDS_FILTER = "CamelMongoDbFieldsFilter";
@@ -32,11 +32,13 @@ public final class MongoDbConstants {
     public static final String SORT_BY = "CamelMongoDbSortBy";
     public static final String DATABASE = "CamelMongoDbDatabase";
     public static final String COLLECTION = "CamelMongoDbCollection";
+    public static final String COLLECTION_INDEX = "CamelMongoDbCollectionIndex";
     public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
     public static final String LIMIT = "CamelMongoDbLimit";
     public static final String FROM_TAILABLE = "CamelMongoDbTailable";
     public static final String WRITERESULT = "CamelMongoWriteResult";
 
-    private MongoDbConstants() { }
-    
+    private MongoDbConstants() {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 97d410e..c216ccb 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -16,6 +16,13 @@
  */
 package org.apache.camel.component.mongodb;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
 import com.mongodb.DBCollection;
 import com.mongodb.DBObject;
@@ -23,7 +30,6 @@ import com.mongodb.Mongo;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -33,14 +39,14 @@ import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.ObjectHelper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents a MongoDb endpoint. 
- * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
- * It accepts a number of options to customise the behaviour of consumers and producers.
+ * Represents a MongoDb endpoint. It is responsible for creating
+ * {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
+ * It accepts a number of options to customise the behaviour of consumers and
+ * producers.
  */
 public class MongoDbEndpoint extends DefaultEndpoint {
 
@@ -48,6 +54,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private Mongo mongoConnection;
     private String database;
     private String collection;
+    private String collectionIndex;
     private MongoDbOperation operation;
     private boolean createCollection = true;
     private boolean invokeGetLastError; // = false
@@ -60,16 +67,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private MongoDbConsumerType consumerType;
     private long cursorRegenerationDelay = 1000L;
     private String tailTrackIncreasingField;
-    
+
     // persitent tail tracking
     private boolean persistentTailTracking; // = false;
     private String persistentId;
     private String tailTrackDb;
     private String tailTrackCollection;
     private String tailTrackField;
-    
+
     private MongoDbTailTrackingConfig tailTrackingConfig;
-    
+
     private DBCollection dbCollection;
     private DB db;
 
@@ -88,7 +95,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     // ======= Implementation methods =====================================
-    
+
     public Producer createProducer() throws Exception {
         validateOptions('P');
         initializeConnection();
@@ -100,12 +107,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         // we never create the collection
         createCollection = false;
         initializeConnection();
-                
+
         // select right consumer type
         if (consumerType == null) {
             consumerType = MongoDbConsumerType.tailable;
         }
-        
+
         Consumer consumer;
         if (consumerType == MongoDbConsumerType.tailable) {
             consumer = new MongoDbTailableCursorConsumer(this, processor);
@@ -121,17 +128,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         // make our best effort to validate, options with defaults are checked against their defaults, which is not always a guarantee that
         // they haven't been explicitly set, but it is enough
         if (role == 'P') {
-            if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) 
+            if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb)
                     || !ObjectHelper.isEmpty(tailTrackCollection) || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) {
                 throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
             }
         } else if (role == 'C') {
-            if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null 
+            if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null
                     || readPreference != null || dynamicity || invokeGetLastError) {
-                throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError " 
+                throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError "
                         + "options cannot appear on a consumer endpoint");
             }
-            
+
             if (consumerType == MongoDbConsumerType.tailable) {
                 if (tailTrackIncreasingField == null) {
                     throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint");
@@ -140,7 +147,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                     throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
                 }
             }
-            
+
         } else {
             throw new IllegalArgumentException("Unknown endpoint role");
         }
@@ -149,9 +156,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public boolean isSingleton() {
         return true;
     }
-    
+
     /**
-     * Initialises the MongoDB connection using the Mongo object provided to the endpoint
+     * Initialises the MongoDB connection using the Mongo object provided to the
+     * endpoint
+     *
      * @throws CamelMongoDbException
      */
     public void initializeConnection() throws CamelMongoDbException {
@@ -167,20 +176,66 @@ public class MongoDbEndpoint extends DefaultEndpoint {
             throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
         }
         dbCollection = db.getCollection(collection);
-        
-        LOG.info("MongoDb component initialised and endpoint bound to MongoDB collection with the following paramters. Address list: {}, Db: {}, Collection: {}", 
-                new Object[] {mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+
+        LOG.debug("MongoDb component initialised and endpoint bound to MongoDB collection with the following parameters. Address list: {}, Db: {}, Collection: {}",
+                new Object[]{mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+
+        try {
+            if (ObjectHelper.isNotEmpty(collectionIndex)) {
+                ensureIndex(dbCollection, createIndex());
+            }
+        } catch (Exception e) {
+            throw new CamelMongoDbException("Error creating index", e);
+        }
+    }
+
+    /**
+     * Add Index
+     *
+     * @param collection
+     */
+    public void ensureIndex(DBCollection collection, List<DBObject> dynamicIndex) {
+        collection.dropIndexes();
+        if (dynamicIndex != null && !dynamicIndex.isEmpty()) {
+            for (DBObject index : dynamicIndex) {
+                LOG.debug("create BDObject Index {}", index);
+                collection.ensureIndex(index);
+            }
+        }
+    }
+
+    /**
+     * Create technical list index
+     *
+     * @return technical list index
+     */
+    @SuppressWarnings("unchecked")
+    public List<DBObject> createIndex() throws Exception {
+        List<DBObject> indexList = new ArrayList<DBObject>();
+
+        if (ObjectHelper.isNotEmpty(collectionIndex)) {
+            HashMap<String, String> indexMap = new ObjectMapper().readValue(collectionIndex, HashMap.class);
+
+            for (Map.Entry<String, String> set : indexMap.entrySet()) {
+                DBObject index = new BasicDBObject();
+                index.put(set.getKey(), set.getValue());
+
+                indexList.add(index);
+            }
+        }
+        return indexList;
     }
 
     /**
-     * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization
+     * Applies validation logic specific to this endpoint type. If everything
+     * succeeds, continues initialization
      */
     @Override
     protected void doStart() throws Exception {
         if (writeConcern != null && writeConcernRef != null) {
-            LOG.error("Cannot set both writeConcern and writeConcernRef at the same time. Respective values: {}, {}. "
-                    + "Aborting initialization.", new Object[] {writeConcern, writeConcernRef});
-            throw new IllegalArgumentException("Cannot set both writeConcern and writeConcernRef at the same time on MongoDB endpoint");
+            String msg = "Cannot set both writeConcern and writeConcernRef at the same time. Respective values: " + writeConcern
+                    + ", " + writeConcernRef + ". Aborting initialization.";
+            throw new IllegalArgumentException(msg);
         }
 
         setWriteReadOptionsOnConnection();
@@ -193,12 +248,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         message.setHeader(MongoDbConstants.DATABASE, database);
         message.setHeader(MongoDbConstants.COLLECTION, collection);
         message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
-        
+
         message.setBody(dbObj);
         exchange.setIn(message);
         return exchange;
     }
-    
+
     private void setWriteReadOptionsOnConnection() {
         // Set the WriteConcern
         if (writeConcern != null) {
@@ -206,18 +261,19 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         } else if (writeConcernRef != null) {
             mongoConnection.setWriteConcern(writeConcernRef);
         }
-        
+
         // Set the ReadPreference
         if (readPreference != null) {
             mongoConnection.setReadPreference(readPreference);
         }
     }
-    
-    
-    // ======= Getters and setters ===============================================
-    
+
+    // ======= Getters and setters
+    // ===============================================
+
     /**
      * Sets the name of the MongoDB collection to bind to this endpoint
+     *
      * @param collection collection name
      */
     public void setCollection(String collection) {
@@ -229,7 +285,21 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}.
+     * Sets the collection index (JSON FORMAT : { "field1" : "order", "field2" :
+     * "order"})
+     */
+    public void setCollectionIndex(String collectionIndex) {
+        this.collectionIndex = collectionIndex;
+    }
+
+    public String getCollectionIndex() {
+        return collectionIndex;
+    }
+
+    /**
+     * Sets the operation this endpoint will execute against MongoDB. For
+     * possible values, see {@link MongoDbOperation}.
+     *
      * @param operation name of the operation as per catalogued values
      * @throws CamelMongoDbException
      */
@@ -247,6 +317,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
     /**
      * Sets the name of the MongoDB database to target
+     *
      * @param database name of the MongoDB database
      */
     public void setDatabase(String database) {
@@ -258,7 +329,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Create collection during initialisation if it doesn't exist. Default is true.
+     * Create collection during initialisation if it doesn't exist. Default is
+     * true.
+     *
      * @param createCollection true or false
      */
     public void setCreateCollection(boolean createCollection) {
@@ -276,9 +349,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public DBCollection getDbCollection() {
         return dbCollection;
     }
-    
+
     /**
      * Sets the Mongo instance that represents the backing connection
+     *
      * @param mongoConnection the connection to the database
      */
     public void setMongoConnection(Mongo mongoConnection) {
@@ -290,10 +364,14 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
-     * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method.
+     * Set the {@link WriteConcern} for write operations on MongoDB using the
+     * standard ones. Resolved from the fields of the WriteConcern class by
+     * calling the {@link WriteConcern#valueOf(String)} method.
+     *
      * @param writeConcern the standard name of the WriteConcern
-     * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a>
+     * @see <a
+     *      href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible
+     *      options</a>
      */
     public void setWriteConcern(String writeConcern) {
         this.writeConcern = WriteConcern.valueOf(writeConcern);
@@ -304,9 +382,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait
-     * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation 
-     * has actually occurred in MongoDB.
+     * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with
+     * every operation. By default, MongoDB does not wait for the write
+     * operation to occur before returning. If set to true, each exchange will
+     * only return after the write operation has actually occurred in MongoDB.
+     *
      * @param invokeGetLastError true or false
      */
     public void setInvokeGetLastError(boolean invokeGetLastError) {
@@ -320,16 +400,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     /**
      * Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry.
      * You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method.
+     *
      * @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use
      */
     public void setWriteConcernRef(String writeConcernRef) {
         WriteConcern wc = this.getCamelContext().getRegistry().lookupByNameAndType(writeConcernRef, WriteConcern.class);
         if (wc == null) {
-            LOG.error("Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the " 
-                    + "provided bean name ({}) is correct. Aborting initialization.", writeConcernRef);
-            throw new IllegalArgumentException("Camel MongoDB component could not find the WriteConcern in the Registry");   
+            String msg = "Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the "
+                    + "provided bean name (" + writeConcernRef + ")  is correct. Aborting initialization.";
+            throw new IllegalArgumentException(msg);
         }
-    
+
         this.writeConcernRef = wc;
     }
 
@@ -337,9 +418,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         return writeConcernRef;
     }
 
-    /** 
-     * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
-     * overridden by this setting.
+    /**
+     * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read
+     * preferences set directly on the connection will be overridden by this
+     * setting.
+     *
      * @param readPreference the bean name of the read preference to set
      */
     public void setReadPreference(String readPreference) {
@@ -354,23 +437,28 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                 break;
             }
         }
-        
-        LOG.error("Could not resolve specified ReadPreference of type {}. Read preferences are resolved from inner " 
-                + "classes of com.mongodb.ReadPreference.", readPreference);
-        throw new IllegalArgumentException("MongoDB endpoint could not resolve specified ReadPreference");
+
+        String msg = "Could not resolve specified ReadPreference of type " + readPreference
+                + ". Read preferences are resolved from inner classes of com.mongodb.ReadPreference.";
+        throw new IllegalArgumentException(msg);
     }
-    
+
     public ReadPreference getReadPreference() {
         return readPreference;
     }
 
     /**
-     * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties.
-     * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI.
-     * It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
+     * Sets whether this endpoint will attempt to dynamically resolve the target
+     * database and collection from the incoming Exchange properties. Can be
+     * used to override at runtime the database and collection specified on the
+     * otherwise static endpoint URI. It is disabled by default to boost
+     * performance. Enabling it will take a minimal performance hit.
+     *
+     * @param dynamicity true or false indicated whether target database and
+     *                   collection should be calculated dynamically based on Exchange
+     *                   properties.
      * @see MongoDbConstants#DATABASE
      * @see MongoDbConstants#COLLECTION
-     * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties.
      */
     public void setDynamicity(boolean dynamicity) {
         this.dynamicity = dynamicity;
@@ -381,7 +469,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Reserved for future use, when more consumer types are supported. 
+     * Reserved for future use, when more consumer types are supported.
+     *
      * @param consumerType key of the consumer type
      * @throws CamelMongoDbException
      */
@@ -396,15 +485,17 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoDbConsumerType getConsumerType() {
         return consumerType;
     }
-    
+
     public String getTailTrackDb() {
         return tailTrackDb;
     }
 
     /**
-     * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will 
-     * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database 
-     * will not vary past endpoint initialisation.
+     * Indicates what database the tail tracking mechanism will persist to. If
+     * not specified, the current database will be picked by default. Dynamicity
+     * will not be taken into account even if enabled, i.e. the tail tracking
+     * database will not vary past endpoint initialisation.
+     *
      * @param tailTrackDb database name
      */
     public void setTailTrackDb(String tailTrackDb) {
@@ -416,8 +507,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} 
-     * will be used by default.
+     * Collection where tail tracking information will be persisted. If not
+     * specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} will be
+     * used by default.
+     *
      * @param tailTrackCollection collection name
      */
     public void setTailTrackCollection(String tailTrackCollection) {
@@ -429,8 +522,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Field where the last tracked value will be placed. If not specified,  {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} 
-     * will be used by default.
+     * Field where the last tracked value will be placed. If not specified,
+     * {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} will be used by default.
+     *
      * @param tailTrackField field name
      */
     public void setTailTrackField(String tailTrackField) {
@@ -438,8 +532,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts.
-     * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records.
+     * Enable persistent tail tracking, which is a mechanism to keep track of
+     * the last consumed message across system restarts. The next time the
+     * system is up, the endpoint will recover the cursor from the point where
+     * it last stopped slurping records.
+     *
      * @param persistentTailTracking true or false
      */
     public void setPersistentTailTracking(boolean persistentTailTracking) {
@@ -449,14 +546,16 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public boolean isPersistentTailTracking() {
         return persistentTailTracking;
     }
-    
+
     /**
-     * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every 
-     * time it is generated.
-     * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent
-     * tail tracking).
-     * Can be of type Integer, Date, String, etc.
-     * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document.
+     * Correlation field in the incoming record which is of increasing nature
+     * and will be used to position the tailing cursor every time it is
+     * generated. The cursor will be (re)created with a query of type:
+     * tailTrackIncreasingField > lastValue (possibly recovered from persistent
+     * tail tracking). Can be of type Integer, Date, String, etc. NOTE: No
+     * support for dot notation at the current time, so the field should be at
+     * the top level of the document.
+     *
      * @param tailTrackIncreasingField
      */
     public void setTailTrackIncreasingField(String tailTrackIncreasingField) {
@@ -469,16 +568,20 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
     public MongoDbTailTrackingConfig getTailTrackingConfig() {
         if (tailTrackingConfig == null) {
-            tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, 
-                    tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, tailTrackField, getPersistentId());
+            tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, tailTrackDb == null ? database : tailTrackDb, tailTrackCollection,
+                    tailTrackField, getPersistentId());
         }
-        return tailTrackingConfig;       
+        return tailTrackingConfig;
     }
 
     /**
-     * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically
-     * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait
-     * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms.
+     * MongoDB tailable cursors will block until new data arrives. If no new
+     * data is inserted, after some time the cursor will be automatically freed
+     * and closed by the MongoDB server. The client is expected to regenerate
+     * the cursor if needed. This value specifies the time to wait before
+     * attempting to fetch a new cursor, and if the attempt fails, how long
+     * before the next attempt is made. Default value is 1000ms.
+     *
      * @param cursorRegenerationDelay delay specified in milliseconds
      */
     public void setCursorRegenerationDelay(long cursorRegenerationDelay) {
@@ -490,9 +593,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * One tail tracking collection can host many trackers for several tailable consumers. 
-     * To keep them separate, each tracker should have its own unique persistentId.
-     * @param persistentId the value of the persistent ID to use for this tailable consumer
+     * One tail tracking collection can host many trackers for several tailable
+     * consumers. To keep them separate, each tracker should have its own unique
+     * persistentId.
+     *
+     * @param persistentId the value of the persistent ID to use for this
+     *                     tailable consumer
      */
     public void setPersistentId(String persistentId) {
         this.persistentId = persistentId;
@@ -507,8 +613,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * In write operations, it determines whether instead of returning {@link WriteResult} as the body of the OUT
-     * message, we transfer the IN message to the OUT and attach the WriteResult as a header.
+     * In write operations, it determines whether instead of returning
+     * {@link WriteResult} as the body of the OUT message, we transfer the IN
+     * message to the OUT and attach the WriteResult as a header.
+     *
      * @param writeResultAsHeader flag to indicate if this option is enabled
      */
     public void setWriteResultAsHeader(boolean writeResultAsHeader) {

http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
index 1bf5a45..c18e696 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
@@ -67,17 +67,19 @@ public class MongoDbProducer extends DefaultProducer {
                 throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e);
             }
         }
-        
+
         try {
             invokeOperation(operation, exchange);
         } catch (Exception e) {
             throw MongoDbComponent.wrapInCamelMongoDbException(e);
         }
-        
+
     }
 
     /**
-     * Entry method that selects the appropriate MongoDB operation and executes it
+     * Entry method that selects the appropriate MongoDB operation and executes
+     * it
+     * 
      * @param operation
      * @param exchange
      * @throws Exception
@@ -87,7 +89,7 @@ public class MongoDbProducer extends DefaultProducer {
         case count:
             doCount(exchange);
             break;
-            
+
         case findOneByQuery:
             doFindOneByQuery(exchange);
             break;
@@ -134,14 +136,15 @@ public class MongoDbProducer extends DefaultProducer {
     }
 
     // ----------- MongoDB operations ----------------
-    
+
     protected void doGetStats(Exchange exchange, MongoDbOperation operation) throws Exception {
         DBObject result = null;
-        
+
         if (operation == MongoDbOperation.getColStats) {
             result = calculateCollection(exchange).getStats();
         } else if (operation == MongoDbOperation.getDbStats) {
-            // if it's a DB, also take into account the dynamicity option and the DB that is used
+            // if it's a DB, also take into account the dynamicity option and
+            // the DB that is used
             result = calculateCollection(exchange).getDB().getStats();
         } else {
             throw new CamelMongoDbException("Internal error: wrong operation for getStats variant" + operation);
@@ -154,13 +157,15 @@ public class MongoDbProducer extends DefaultProducer {
     protected void doRemove(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
         DBObject removeObj = exchange.getIn().getMandatoryBody(DBObject.class);
-        
+
         WriteConcern wc = extractWriteConcern(exchange);
         WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc);
-        
+
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.remove);
-        // we always return the WriteResult, because whether the getLastError was called or not, 
-        // the user will have the means to call it or obtain the cached CommandResult
+        // we always return the WriteResult, because whether the getLastError
+        // was called or not,
+        // the user will have the means to call it or obtain the cached
+        // CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
     }
@@ -168,96 +173,106 @@ public class MongoDbProducer extends DefaultProducer {
     @SuppressWarnings("unchecked")
     protected void doUpdate(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
-        List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>) (Class<?>) List.class);
+        List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>)(Class<?>)List.class);
         if (saveObj.size() != 2) {
             throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2");
         }
-        
+
         DBObject updateCriteria = saveObj.get(0);
         DBObject objNew = saveObj.get(1);
-        
+
         Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class);
         Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class);
-        
+
         WriteResult result;
         WriteConcern wc = extractWriteConcern(exchange);
-        // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the
-        // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future, 
+        // In API 2.7, the default upsert and multi values of update(DBObject,
+        // DBObject) are false, false, so we unconditionally invoke the
+        // full-signature method update(DBObject, DBObject, boolean, boolean).
+        // However, the default behaviour may change in the future,
         // so it's safer to be explicit at this level for full determinism
         if (multi == null && upsert == null) {
-            // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args,
+            // for update with no multi nor upsert but with specific
+            // WriteConcern there is no update signature without multi and
+            // upsert args,
             // so assume defaults
             result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc);
         } else {
-            // we calculate the final boolean values so that if any of these parameters is null, it is resolved to false
-            result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) 
-                    : dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
+            // we calculate the final boolean values so that if any of these
+            // parameters is null, it is resolved to false
+            result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) : dbCol
+                .update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
         }
-        
+
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.update);
-        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // we always return the WriteResult, because whether the getLastError
+        // was called or not, the user will have the means to call it or
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
     }
-    
+
     protected void doSave(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
         DBObject saveObj = exchange.getIn().getMandatoryBody(DBObject.class);
-        
+
         WriteConcern wc = extractWriteConcern(exchange);
         WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc);
-        
+
         prepareResponseMessage(exchange, MongoDbOperation.save);
-        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // we always return the WriteResult, because whether the getLastError
+        // was called or not, the user will have the means to call it or
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
     }
-    
+
     protected void doFindById(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
         Object o = exchange.getIn().getMandatoryBody();
         DBObject ret;
-        
+
         DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
         if (fieldFilter == null) {
             ret = dbCol.findOne(o);
         } else {
             ret = dbCol.findOne(o, fieldFilter);
         }
-    
+
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.save);
         resultMessage.setBody(ret);
         resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @SuppressWarnings({"rawtypes", "unchecked"})
     protected void doInsert(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
         boolean singleInsert = true;
         Object insert = exchange.getIn().getBody(DBObject.class);
-        // body could not be converted to DBObject, check to see if it's of type List<DBObject>
+        // body could not be converted to DBObject, check to see if it's of type
+        // List<DBObject>
         if (insert == null) {
             insert = exchange.getIn().getBody(List.class);
-            // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject>
+            // if the body of type List was obtained, ensure that all items are
+            // of type DBObject and cast the List to List<DBObject>
             if (insert != null) {
                 singleInsert = false;
-                insert = attemptConvertToList((List) insert, exchange);
+                insert = attemptConvertToList((List)insert, exchange);
             } else {
                 throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type DBObject nor List<DBObject>");
             }
         }
-        
+
         WriteResult result;
         WriteConcern wc = extractWriteConcern(exchange);
         if (singleInsert) {
-            result = wc == null ? dbCol.insert((DBObject) insert) : dbCol.insert((DBObject) insert, wc);
+            result = wc == null ? dbCol.insert((DBObject)insert) : dbCol.insert((DBObject)insert, wc);
         } else {
-            result = wc == null ? dbCol.insert((List<DBObject>) insert) : dbCol.insert((List<DBObject>) insert, wc);
+            result = wc == null ? dbCol.insert((List<DBObject>)insert) : dbCol.insert((List<DBObject>)insert, wc);
         }
-        
+
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.insert);
-        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
+        // we always return the WriteResult, because whether the getLastError
+        // was called or not, the user will have the means to call it or
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setBody(result);
@@ -265,21 +280,23 @@ public class MongoDbProducer extends DefaultProducer {
 
     protected void doFindAll(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
-        // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+        // do not use getMandatoryBody, because if the body is empty we want to
+        // retrieve all objects in the collection
         DBObject query = null;
-        // do not run around looking for a type converter unless there is a need for it
+        // do not run around looking for a type converter unless there is a need
+        // for it
         if (exchange.getIn().getBody() != null) {
             query = exchange.getIn().getBody(DBObject.class);
         }
         DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
-        
+
         // get the batch size and number to skip
         Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
         Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class);
         Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class);
         DBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, DBObject.class);
         DBCursor ret = null;
-        try {  
+        try {
             if (query == null && fieldFilter == null) {
                 ret = dbCol.find(new BasicDBObject());
             } else if (fieldFilter == null) {
@@ -287,28 +304,28 @@ public class MongoDbProducer extends DefaultProducer {
             } else {
                 ret = dbCol.find(query, fieldFilter);
             }
-            
+
             if (sortBy != null) {
                 ret.sort(sortBy);
             }
-            
+
             if (batchSize != null) {
                 ret.batchSize(batchSize.intValue());
             }
-            
+
             if (numToSkip != null) {
                 ret.skip(numToSkip.intValue());
             }
-    
+
             if (limit != null) {
                 ret.limit(limit.intValue());
             }
-            
+
             Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findAll);
             resultMessage.setBody(ret.toArray());
             resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
             resultMessage.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
-            
+
         } catch (Exception e) {
             // rethrow the exception
             throw e;
@@ -318,7 +335,7 @@ public class MongoDbProducer extends DefaultProducer {
                 ret.close();
             }
         }
-        
+
     }
 
     protected void doFindOneByQuery(Exchange exchange) throws Exception {
@@ -332,7 +349,7 @@ public class MongoDbProducer extends DefaultProducer {
         } else {
             ret = dbCol.findOne(o, fieldFilter);
         }
-        
+
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findOneByQuery);
         resultMessage.setBody(ret);
         resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
@@ -405,7 +422,9 @@ public class MongoDbProducer extends DefaultProducer {
         }
         
         dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection);
-        LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+        }
         return dbCol;
     }
     
@@ -423,18 +442,19 @@ public class MongoDbProducer extends DefaultProducer {
                 exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
             }
         }
-        
-        // determine where to set the WriteResult: as the OUT body or as an IN message header
+
+        // determine where to set the WriteResult: as the OUT body or as an IN
+        // message header
         if (endpoint.isWriteResultAsHeader()) {
             exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result);
         } else {
             exchange.getOut().setBody(result);
         }
     }
-    
+
     private WriteConcern extractWriteConcern(Exchange exchange) throws CamelMongoDbException {
         Object o = exchange.getIn().getHeader(MongoDbConstants.WRITECONCERN);
-        
+
         if (o == null) {
             return null;
         } else if (o instanceof WriteConcern) {
@@ -442,16 +462,16 @@ public class MongoDbProducer extends DefaultProducer {
         } else if (o instanceof String) {
             WriteConcern answer = WriteConcern.valueOf(ObjectHelper.cast(String.class, o));
             if (answer == null) {
-                throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN 
-                        + " header, with value " + o + " could not be resolved to a WriteConcern type");
+                throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN + " header, with value " + o
+                                                + " could not be resolved to a WriteConcern type");
             }
         }
-        
+
         // should never get here
         LOG.warn("A problem occurred while resolving the Exchange's Write Concern");
         return null;
     }
-    
+
     @SuppressWarnings("rawtypes")
     private List<DBObject> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
         List<DBObject> dbObjectList = new ArrayList<DBObject>(insertList.size());
@@ -466,7 +486,7 @@ public class MongoDbProducer extends DefaultProducer {
         }
         return dbObjectList;
     }
-    
+
     private Message prepareResponseMessage(Exchange exchange, MongoDbOperation operation) {
         Message answer = exchange.getOut();
         MessageHelper.copyHeaders(exchange.getIn(), answer, false);
@@ -475,9 +495,9 @@ public class MongoDbProducer extends DefaultProducer {
         }
         return answer;
     }
-    
+
     private boolean isWriteOperation(MongoDbOperation operation) {
         return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
new file mode 100644
index 0000000..25468f5
--- /dev/null
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.WriteResult;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MongoDbIndexTest extends AbstractMongoDbTest {
+
+    @Test
+    public void testInsertDynamicityEnabledDBAndCollectionAndIndex() {
+        assertEquals(0, testCollection.count());
+        mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\", \"b\" : \"2\"}";
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(MongoDbConstants.DATABASE, "otherDB");
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+        List<DBObject> objIndex = new ArrayList<DBObject>();
+        DBObject index1 = new BasicDBObject();
+        index1.put("a", 1);
+        DBObject index2 = new BasicDBObject();
+        index2.put("b", -1);
+        objIndex.add(index1);
+        objIndex.add(index2);
+        headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+        Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+        assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+        DBCollection dynamicCollection = mongo.getDB("otherDB").getCollection("otherCollection");
+
+        List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+        BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+        assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+        assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
+
+        DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledDBAndCollection");
+        assertNotNull("No record with 'testInsertDynamicityEnabledDBAndCollection' _id", b);
+
+        b = testCollection.findOne("testInsertDynamicityEnabledDBOnly");
+        assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b);
+
+        assertTrue("The otherDB database should exist", mongo.getDatabaseNames().contains("otherDB"));
+    }
+
+    @Test
+    public void testInsertDynamicityEnabledCollectionAndIndex() {
+        assertEquals(0, testCollection.count());
+        mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+        List<DBObject> objIndex = new ArrayList<DBObject>();
+        DBObject index1 = new BasicDBObject();
+        index1.put("a", 1);
+        DBObject index2 = new BasicDBObject();
+        index2.put("b", -1);
+        objIndex.add(index1);
+        objIndex.add(index2);
+        headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex);
+
+        Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers);
+
+        assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+        DBCollection dynamicCollection = db.getCollection("otherCollection");
+
+        List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+        BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+        assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+        assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
+
+        DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionAndIndex");
+        assertNotNull("No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id", b);
+
+        b = testCollection.findOne("testInsertDynamicityEnabledDBOnly");
+        assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b);
+
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+    }
+
+    @Test
+    public void testInsertDynamicityEnabledCollectionOnlyAndURIIndex() {
+        assertEquals(0, testCollection.count());
+        mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+
+        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put(MongoDbConstants.COLLECTION, "otherCollection");
+
+        Object result = template.requestBodyAndHeaders("direct:dynamicityEnabledWithIndexUri", body, headers);
+
+        assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+        DBCollection dynamicCollection = db.getCollection("otherCollection");
+        List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
+
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+
+        assertFalse("No index on the field a", key1.containsField("a") && "-1".equals(key1.getString("a")));
+
+        DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex");
+        assertNotNull("No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id", b);
+
+        b = testCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex");
+        assertNull("There is a record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id in the test collection", b);
+
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+    }
+
+    @Test
+    public void testInsertAutoCreateCollectionAndURIIndex() {
+        assertEquals(0, testCollection.count());
+        db.getCollection("otherCollection").remove(new BasicDBObject());
+
+        String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        Map<String, Object> headers = new HashMap<String, Object>();
+
+        Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers);
+        assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass());
+
+        DBCollection collection = db.getCollection("otherCollection");
+        List<DBObject> indexInfos = collection.getIndexInfo();
+
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+        BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
+
+        assertTrue("No index on the field b", key1.containsField("b") && "-1".equals(key1.getString("b")));
+        assertTrue("No index on the field a", key2.containsField("a") && "1".equals(key2.getString("a")));
+
+        DBObject b = collection.findOne("testInsertAutoCreateCollectionAndURIIndex");
+        assertNotNull("No record with 'testInsertAutoCreateCollectionAndURIIndex' _id", b);
+
+        b = testCollection.findOne("testInsertAutoCreateCollectionAndURIIndex");
+        assertNull("There is a record with 'testInsertAutoCreateCollectionAndURIIndex' _id in the test collection", b);
+
+        assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:dynamicityEnabled")
+                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE");
+                from("direct:dynamicityEnabledWithIndexUri")
+                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":\"1\"}&operation=insert&dynamicity=true&writeConcern=SAFE");
+                from("direct:dynamicityDisabled")
+                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":\"1\",\"b\":\"-1\"}&operation=insert&dynamicity=false&writeConcern=SAFE");
+            }
+        };
+    }
+}