You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by bv...@apache.org on 2013/10/02 07:14:52 UTC

git commit: CAMEL-6000: Applied the part of the provided patch not being applied into the codebase (part related to MongoDbProducer#calculateCollection method). Also restored the previous Javadoc / comment formatting being changed through the previous co

Updated Branches:
  refs/heads/master 80a14c8af -> d2bd97bd3


CAMEL-6000: Applied the part of the provided patch not being applied into the codebase (part related to MongoDbProducer#calculateCollection method). Also restored the previous Javadoc / comment formatting being changed through the previous commit of this ticket.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2bd97bd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2bd97bd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2bd97bd

Branch: refs/heads/master
Commit: d2bd97bd303a8049f8de88a552bf7bbe0b7b3ba3
Parents: 80a14c8
Author: Babak Vahdat <bv...@apache.org>
Authored: Wed Oct 2 07:14:40 2013 +0200
Committer: Babak Vahdat <bv...@apache.org>
Committed: Wed Oct 2 07:14:40 2013 +0200

----------------------------------------------------------------------
 .../component/mongodb/MongoDbEndpoint.java      | 167 ++++++++-----------
 .../component/mongodb/MongoDbProducer.java      |  88 +++++-----
 .../component/mongodb/MongoDbIndexTest.java     |  28 ++--
 3 files changed, 132 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d2bd97bd/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 3bcfa9e..7dfcbde 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -43,10 +43,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents a MongoDb endpoint. It is responsible for creating
- * {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
- * It accepts a number of options to customise the behaviour of consumers and
- * producers.
+ * Represents a MongoDb endpoint. 
+ * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
+ * It accepts a number of options to customise the behaviour of consumers and producers.
  */
 public class MongoDbEndpoint extends DefaultEndpoint {
 
@@ -158,9 +157,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Initialises the MongoDB connection using the Mongo object provided to the
-     * endpoint
-     *
+     * Initialises the MongoDB connection using the Mongo object provided to the endpoint
+     * 
      * @throws CamelMongoDbException
      */
     public void initializeConnection() throws CamelMongoDbException {
@@ -218,10 +216,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
             for (Map.Entry<String, String> set : indexMap.entrySet()) {
                 DBObject index = new BasicDBObject();
-                // MongoDB 2.4 upwards is restrictive about the type of the 'single field index' being in use so that
-                // we should convert the index value to an Integer, see also:
+                // MongoDB 2.4 upwards is restrictive about the type of the 'single field index' being
+                // in use below (set.getValue())) as only an integer value type is accepted, otherwise
+                // server will throw an exception, see more details:
                 // http://docs.mongodb.org/manual/release-notes/2.4/#improved-validation-of-index-types
-                index.put(set.getKey(), Integer.valueOf(set.getValue()));
+                index.put(set.getKey(), set.getValue());
 
                 indexList.add(index);
             }
@@ -230,8 +229,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Applies validation logic specific to this endpoint type. If everything
-     * succeeds, continues initialization
+     * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization
      */
     @Override
     protected void doStart() throws Exception {
@@ -270,13 +268,13 @@ public class MongoDbEndpoint extends DefaultEndpoint {
             mongoConnection.setReadPreference(readPreference);
         }
     }
-
-    // ======= Getters and setters
-    // ===============================================
-
+    
+    
+    // ======= Getters and setters ===============================================
+    
     /**
      * Sets the name of the MongoDB collection to bind to this endpoint
-     *
+     * 
      * @param collection collection name
      */
     public void setCollection(String collection) {
@@ -288,8 +286,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets the collection index (JSON FORMAT : { "field1" : "order", "field2" :
-     * "order"})
+     * Sets the collection index (JSON FORMAT : { "field1" : order1, "field2" : order2})
      */
     public void setCollectionIndex(String collectionIndex) {
         this.collectionIndex = collectionIndex;
@@ -300,10 +297,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets the operation this endpoint will execute against MongoDB. For
-     * possible values, see {@link MongoDbOperation}.
-     *
+     * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}.
      * @param operation name of the operation as per catalogued values
+     * 
      * @throws CamelMongoDbException
      */
     public void setOperation(String operation) throws CamelMongoDbException {
@@ -320,7 +316,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
     /**
      * Sets the name of the MongoDB database to target
-     *
+     * 
      * @param database name of the MongoDB database
      */
     public void setDatabase(String database) {
@@ -332,9 +328,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Create collection during initialisation if it doesn't exist. Default is
-     * true.
-     *
+     * Create collection during initialisation if it doesn't exist. Default is true.
+     * 
      * @param createCollection true or false
      */
     public void setCreateCollection(boolean createCollection) {
@@ -355,7 +350,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
     /**
      * Sets the Mongo instance that represents the backing connection
-     *
+     * 
      * @param mongoConnection the connection to the database
      */
     public void setMongoConnection(Mongo mongoConnection) {
@@ -367,14 +362,11 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Set the {@link WriteConcern} for write operations on MongoDB using the
-     * standard ones. Resolved from the fields of the WriteConcern class by
-     * calling the {@link WriteConcern#valueOf(String)} method.
-     *
+     * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
+     * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method.
+     * 
      * @param writeConcern the standard name of the WriteConcern
-     * @see <a
-     *      href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible
-     *      options</a>
+     * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a>
      */
     public void setWriteConcern(String writeConcern) {
         this.writeConcern = WriteConcern.valueOf(writeConcern);
@@ -385,11 +377,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with
-     * every operation. By default, MongoDB does not wait for the write
-     * operation to occur before returning. If set to true, each exchange will
-     * only return after the write operation has actually occurred in MongoDB.
-     *
+     * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait
+     * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation 
+     * has actually occurred in MongoDB.
+     * 
      * @param invokeGetLastError true or false
      */
     public void setInvokeGetLastError(boolean invokeGetLastError) {
@@ -403,7 +394,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     /**
      * Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry.
      * You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method.
-     *
+     * 
      * @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use
      */
     public void setWriteConcernRef(String writeConcernRef) {
@@ -421,11 +412,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         return writeConcernRef;
     }
 
-    /**
-     * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read
-     * preferences set directly on the connection will be overridden by this
-     * setting.
-     *
+    /** 
+     * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
+     * overridden by this setting.
+     * 
      * @param readPreference the bean name of the read preference to set
      */
     public void setReadPreference(String readPreference) {
@@ -451,17 +441,13 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Sets whether this endpoint will attempt to dynamically resolve the target
-     * database and collection from the incoming Exchange properties. Can be
-     * used to override at runtime the database and collection specified on the
-     * otherwise static endpoint URI. It is disabled by default to boost
-     * performance. Enabling it will take a minimal performance hit.
-     *
-     * @param dynamicity true or false indicated whether target database and
-     *                   collection should be calculated dynamically based on Exchange
-     *                   properties.
+     * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties.
+     * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI.
+     * It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
+     * 
      * @see MongoDbConstants#DATABASE
      * @see MongoDbConstants#COLLECTION
+     * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties.
      */
     public void setDynamicity(boolean dynamicity) {
         this.dynamicity = dynamicity;
@@ -494,11 +480,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Indicates what database the tail tracking mechanism will persist to. If
-     * not specified, the current database will be picked by default. Dynamicity
-     * will not be taken into account even if enabled, i.e. the tail tracking
-     * database will not vary past endpoint initialisation.
-     *
+     * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will 
+     * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database 
+     * will not vary past endpoint initialisation.
+     * 
      * @param tailTrackDb database name
      */
     public void setTailTrackDb(String tailTrackDb) {
@@ -510,10 +495,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Collection where tail tracking information will be persisted. If not
-     * specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} will be
-     * used by default.
-     *
+     * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} 
+     * will be used by default.
+     * 
      * @param tailTrackCollection collection name
      */
     public void setTailTrackCollection(String tailTrackCollection) {
@@ -525,9 +509,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Field where the last tracked value will be placed. If not specified,
-     * {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} will be used by default.
-     *
+     * Field where the last tracked value will be placed. If not specified,  {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} 
+     * will be used by default.
+     * 
      * @param tailTrackField field name
      */
     public void setTailTrackField(String tailTrackField) {
@@ -535,11 +519,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Enable persistent tail tracking, which is a mechanism to keep track of
-     * the last consumed message across system restarts. The next time the
-     * system is up, the endpoint will recover the cursor from the point where
-     * it last stopped slurping records.
-     *
+     * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts.
+     * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records.
+     * 
      * @param persistentTailTracking true or false
      */
     public void setPersistentTailTracking(boolean persistentTailTracking) {
@@ -551,14 +533,13 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Correlation field in the incoming record which is of increasing nature
-     * and will be used to position the tailing cursor every time it is
-     * generated. The cursor will be (re)created with a query of type:
-     * tailTrackIncreasingField > lastValue (possibly recovered from persistent
-     * tail tracking). Can be of type Integer, Date, String, etc. NOTE: No
-     * support for dot notation at the current time, so the field should be at
-     * the top level of the document.
-     *
+     * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every 
+     * time it is generated.
+     * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent
+     * tail tracking).
+     * Can be of type Integer, Date, String, etc.
+     * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document.
+     * 
      * @param tailTrackIncreasingField
      */
     public void setTailTrackIncreasingField(String tailTrackIncreasingField) {
@@ -578,13 +559,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * MongoDB tailable cursors will block until new data arrives. If no new
-     * data is inserted, after some time the cursor will be automatically freed
-     * and closed by the MongoDB server. The client is expected to regenerate
-     * the cursor if needed. This value specifies the time to wait before
-     * attempting to fetch a new cursor, and if the attempt fails, how long
-     * before the next attempt is made. Default value is 1000ms.
-     *
+     * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically
+     * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait
+     * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms.
+     * 
      * @param cursorRegenerationDelay delay specified in milliseconds
      */
     public void setCursorRegenerationDelay(long cursorRegenerationDelay) {
@@ -596,12 +574,10 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * One tail tracking collection can host many trackers for several tailable
-     * consumers. To keep them separate, each tracker should have its own unique
-     * persistentId.
-     *
-     * @param persistentId the value of the persistent ID to use for this
-     *                     tailable consumer
+     * One tail tracking collection can host many trackers for several tailable consumers. 
+     * To keep them separate, each tracker should have its own unique persistentId.
+     * 
+     * @param persistentId the value of the persistent ID to use for this tailable consumer
      */
     public void setPersistentId(String persistentId) {
         this.persistentId = persistentId;
@@ -616,10 +592,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * In write operations, it determines whether instead of returning
-     * {@link WriteResult} as the body of the OUT message, we transfer the IN
-     * message to the OUT and attach the WriteResult as a header.
-     *
+     * In write operations, it determines whether instead of returning {@link WriteResult} as the body of the OUT
+     * message, we transfer the IN message to the OUT and attach the WriteResult as a header.
+     * 
      * @param writeResultAsHeader flag to indicate if this option is enabled
      */
     public void setWriteResultAsHeader(boolean writeResultAsHeader) {

http://git-wip-us.apache.org/repos/asf/camel/blob/d2bd97bd/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
index 1f16636..64b1810 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
@@ -77,8 +77,7 @@ public class MongoDbProducer extends DefaultProducer {
     }
 
     /**
-     * Entry method that selects the appropriate MongoDB operation and executes
-     * it
+     * Entry method that selects the appropriate MongoDB operation and executes it
      * 
      * @param operation
      * @param exchange
@@ -143,8 +142,7 @@ public class MongoDbProducer extends DefaultProducer {
         if (operation == MongoDbOperation.getColStats) {
             result = calculateCollection(exchange).getStats();
         } else if (operation == MongoDbOperation.getDbStats) {
-            // if it's a DB, also take into account the dynamicity option and
-            // the DB that is used
+            // if it's a DB, also take into account the dynamicity option and the DB that is used
             result = calculateCollection(exchange).getDB().getStats();
         } else {
             throw new CamelMongoDbException("Internal error: wrong operation for getStats variant" + operation);
@@ -162,10 +160,8 @@ public class MongoDbProducer extends DefaultProducer {
         WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc);
 
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.remove);
-        // we always return the WriteResult, because whether the getLastError
-        // was called or not,
-        // the user will have the means to call it or obtain the cached
-        // CommandResult
+        // we always return the WriteResult, because whether the getLastError was called or not,
+        // the user will have the means to call it or obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
     }
@@ -186,15 +182,11 @@ public class MongoDbProducer extends DefaultProducer {
 
         WriteResult result;
         WriteConcern wc = extractWriteConcern(exchange);
-        // In API 2.7, the default upsert and multi values of update(DBObject,
-        // DBObject) are false, false, so we unconditionally invoke the
-        // full-signature method update(DBObject, DBObject, boolean, boolean).
-        // However, the default behaviour may change in the future,
+        // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the
+        // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future, 
         // so it's safer to be explicit at this level for full determinism
         if (multi == null && upsert == null) {
-            // for update with no multi nor upsert but with specific
-            // WriteConcern there is no update signature without multi and
-            // upsert args,
+            // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args,
             // so assume defaults
             result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc);
         } else {
@@ -205,8 +197,7 @@ public class MongoDbProducer extends DefaultProducer {
         }
 
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.update);
-        // we always return the WriteResult, because whether the getLastError
-        // was called or not, the user will have the means to call it or
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
@@ -220,8 +211,7 @@ public class MongoDbProducer extends DefaultProducer {
         WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc);
 
         prepareResponseMessage(exchange, MongoDbOperation.save);
-        // we always return the WriteResult, because whether the getLastError
-        // was called or not, the user will have the means to call it or
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
     }
@@ -248,12 +238,10 @@ public class MongoDbProducer extends DefaultProducer {
         DBCollection dbCol = calculateCollection(exchange);
         boolean singleInsert = true;
         Object insert = exchange.getIn().getBody(DBObject.class);
-        // body could not be converted to DBObject, check to see if it's of type
-        // List<DBObject>
+        // body could not be converted to DBObject, check to see if it's of type List<DBObject>
         if (insert == null) {
             insert = exchange.getIn().getBody(List.class);
-            // if the body of type List was obtained, ensure that all items are
-            // of type DBObject and cast the List to List<DBObject>
+            // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject>
             if (insert != null) {
                 singleInsert = false;
                 insert = attemptConvertToList((List)insert, exchange);
@@ -271,8 +259,7 @@ public class MongoDbProducer extends DefaultProducer {
         }
 
         Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.insert);
-        // we always return the WriteResult, because whether the getLastError
-        // was called or not, the user will have the means to call it or
+        // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
         processAndTransferWriteResult(result, exchange);
         resultMessage.setBody(result);
@@ -280,11 +267,9 @@ public class MongoDbProducer extends DefaultProducer {
 
     protected void doFindAll(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
-        // do not use getMandatoryBody, because if the body is empty we want to
-        // retrieve all objects in the collection
+        // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
         DBObject query = null;
-        // do not run around looking for a type converter unless there is a need
-        // for it
+        // do not run around looking for a type converter unless there is a need for it
         if (exchange.getIn().getBody() != null) {
             query = exchange.getIn().getBody(DBObject.class);
         }
@@ -400,7 +385,7 @@ public class MongoDbProducer extends DefaultProducer {
     }
     // --------- Convenience methods -----------------------
     
-    private DBCollection calculateCollection(Exchange exchange) {
+    private DBCollection calculateCollection(Exchange exchange) throws Exception {
         // dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
         // resolution logic on every Exchange if they won't be using this functionality at all
         if (!endpoint.isDynamicity()) {
@@ -409,19 +394,37 @@ public class MongoDbProducer extends DefaultProducer {
         
         String dynamicDB = exchange.getIn().getHeader(MongoDbConstants.DATABASE, String.class);
         String dynamicCollection = exchange.getIn().getHeader(MongoDbConstants.COLLECTION, String.class);
-        
-        if (dynamicDB == null && dynamicCollection == null) {
-            return endpoint.getDbCollection();
-        }
-        
-        DB db = endpoint.getDb();
+                
+        @SuppressWarnings("unchecked")
+        List<DBObject> dynamicIndex = exchange.getIn().getHeader(MongoDbConstants.COLLECTION_INDEX, List.class);
+
         DBCollection dbCol = null;
         
-        if (dynamicDB != null) {
-            db = endpoint.getMongoConnection().getDB(dynamicDB);
+        if (dynamicDB == null && dynamicCollection == null) {
+            dbCol = endpoint.getDbCollection();
+        } else {
+            DB db = null;
+
+            if (dynamicDB == null) {
+                db = endpoint.getDb();
+            } else {
+                db = endpoint.getMongoConnection().getDB(dynamicDB);
+            }
+
+            if (dynamicCollection == null) {
+                dbCol = db.getCollection(endpoint.getCollection());
+            } else {
+                dbCol = db.getCollection(dynamicCollection);
+
+                // on the fly add index
+                if (dynamicIndex == null) {
+                    endpoint.ensureIndex(dbCol, endpoint.createIndex());
+                } else {
+                    endpoint.ensureIndex(dbCol, dynamicIndex);
+                }
+            }
         }
-        
-        dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection);
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
         }
@@ -442,9 +445,8 @@ public class MongoDbProducer extends DefaultProducer {
                 exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
             }
         }
-
-        // determine where to set the WriteResult: as the OUT body or as an IN
-        // message header
+        
+        // determine where to set the WriteResult: as the OUT body or as an IN message header
         if (endpoint.isWriteResultAsHeader()) {
             exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result);
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/d2bd97bd/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
index 8081577..576a951 100644
--- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java
@@ -37,7 +37,7 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
         db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
 
-        String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\", \"b\" : \"2\"}";
+        String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : 1, \"b\" : 2}";
         Map<String, Object> headers = new HashMap<String, Object>();
         headers.put(MongoDbConstants.DATABASE, "otherDB");
         headers.put(MongoDbConstants.COLLECTION, "otherCollection");
@@ -59,9 +59,11 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
 
         List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
 
-        BasicDBObject key = (BasicDBObject) indexInfos.get(0).get("key");
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+        BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
 
-        assertTrue("The field _id with the expected value not found", key.containsField("_id") && "1".equals(key.getString("_id")));
+        assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+        assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
 
         DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledDBAndCollection");
         assertNotNull("No record with 'testInsertDynamicityEnabledDBAndCollection' _id", b);
@@ -79,7 +81,7 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
         db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
 
-        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : 1, \"b\" : 2}";
         Map<String, Object> headers = new HashMap<String, Object>();
         headers.put(MongoDbConstants.COLLECTION, "otherCollection");
 
@@ -100,9 +102,11 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
 
         List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
 
-        BasicDBObject key = (BasicDBObject) indexInfos.get(0).get("key");
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
+        BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key");
 
-        assertTrue("The field _id with the expected value not found", key.containsField("_id") && "1".equals(key.getString("_id")));
+        assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a")));
+        assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b")));
 
         DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionAndIndex");
         assertNotNull("No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id", b);
@@ -120,7 +124,7 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
         db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB"));
 
-        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : 1, \"b\" : 2}";
         Map<String, Object> headers = new HashMap<String, Object>();
         headers.put(MongoDbConstants.COLLECTION, "otherCollection");
 
@@ -132,9 +136,9 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
 
         List<DBObject> indexInfos = dynamicCollection.getIndexInfo();
 
-        BasicDBObject key = (BasicDBObject)indexInfos.get(0).get("key");
+        BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key");
 
-        assertTrue("The field _id with the expected value not found", key.containsField("_id") && "1".equals(key.getString("_id")));
+        assertFalse("No index on the field a", key1.containsField("a") && "-1".equals(key1.getString("a")));
 
         DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex");
         assertNotNull("No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id", b);
@@ -150,7 +154,7 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
         assertEquals(0, testCollection.count());
         db.getCollection("otherCollection").remove(new BasicDBObject());
 
-        String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}";
+        String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : 1, \"b\" : 2}";
         Map<String, Object> headers = new HashMap<String, Object>();
 
         Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers);
@@ -181,9 +185,9 @@ public class MongoDbIndexTest extends AbstractMongoDbTest {
                 from("direct:dynamicityEnabled")
                         .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE");
                 from("direct:dynamicityEnabledWithIndexUri")
-                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":\"1\"}&operation=insert&dynamicity=true&writeConcern=SAFE");
+                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":1}&operation=insert&dynamicity=true&writeConcern=SAFE");
                 from("direct:dynamicityDisabled")
-                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":\"1\",\"b\":\"-1\"}&operation=insert&dynamicity=false&writeConcern=SAFE");
+                        .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":1,\"b\":-1}&operation=insert&dynamicity=false&writeConcern=SAFE");
             }
         };
     }