You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/26 07:03:28 UTC

[camel] 01/02: CAMEL-12185 - Camel-MongoDB : add option outputType for aggregate operation

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e795ab99cb7da40763b4e8f08621bb907396cfbe
Author: Farès Hassak <fa...@gmail.com>
AuthorDate: Sat Jan 20 19:20:26 2018 +0100

    CAMEL-12185 - Camel-MongoDB : add option outputType for aggregate operation
---
 .../src/main/docs/mongodb-component.adoc           |  47 +++++++-
 .../camel/component/mongodb/MongoDbProducer.java   |  42 +++++---
 .../mongodb/MongoDbAggregateOperationTest.java     | 118 +++++++++++++++++++++
 .../component/mongodb/MongoDbOperationsTest.java   |  19 ----
 .../src/main/docs/mongodb3-component.adoc          |  44 +++++++-
 .../camel/component/mongodb3/MongoDbProducer.java  |  48 +++++----
 .../mongodb3/MongoDbAggregateOperationTest.java    | 116 ++++++++++++++++++++
 7 files changed, 383 insertions(+), 51 deletions(-)

diff --git a/components/camel-mongodb/src/main/docs/mongodb-component.adoc b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
index 2fac8cb..39dc979 100644
--- a/components/camel-mongodb/src/main/docs/mongodb-component.adoc
+++ b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
@@ -86,7 +86,7 @@ with the following path and query parameters:
 | *createCollection* (common) | Create collection during initialisation if it doesn't exist. Default is true. | true | boolean
 | *database* (common) | Sets the name of the MongoDB database to target |  | String
 | *operation* (common) | Sets the operation this endpoint will execute against MongoDB. For possible values see MongoDbOperation. |  | MongoDbOperation
-| *outputType* (common) | Convert the output of the producer to the selected type : DBObjectList DBObject or DBCursor. DBObjectList or DBObject applies to findAll. DBCursor applies to all other operations. |  | MongoDbOutputType
+| *outputType* (common) | Convert the output of the producer to the selected type : DBObjectList DBObject or DBCursor. DBObjectList or DBCursor applies to findAll and aggregate. DBObject applies to all other operations. |  | MongoDbOutputType
 | *writeConcern* (common) | Set the WriteConcern for write operations on MongoDB using the standard ones. Resolved from the fields of the WriteConcern class by calling the link WriteConcernvalueOf(String) method. | ACKNOWLEDGED | WriteConcern
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
@@ -254,6 +254,23 @@ changed even after a cursor is iterated, in which case the setting will
 apply on the next batch retrieval. |int/Integer
 |=======================================================================
 
+You can also "stream" the documents returned from the server into your route by including outputType=DBCursor (Camel 2.16+) as an endpoint option
+which may prove simpler than setting the above headers. This hands your Exchange the DBCursor from the Mongo driver, just as if you were executing
+the findAll() within the Mongo shell, allowing your route to iterate over the results. By default and without this option, this component will load
+the documents from the driver's cursor into a List and return this to your route - which may result in a large number of in-memory objects. Remember,
+with a DBCursor do not ask for the number of documents matched - see the MongoDB documentation site for details.
+
+Example with option outputType=DBCursor and batch size :
+
+[source,java]
+-----------------------------------------------------------------------------
+from("direct:findAll")
+    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
+    .setBody().constant("{ \"name\": \"Raul Kripalani\" }")
+    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll&outputType=DBCursor")
+    .to("mock:resultFindAll");
+-----------------------------------------------------------------------------
+
 The `findAll` operation will also return the following OUT headers to
 enable you to iterate through result pages if you are using paging:
 
@@ -520,6 +537,34 @@ from("direct:aggregate")
     .to("mock:resultAggregate");
 ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
+Efficient retrieval is supported via outputType=DBCursor and the following header :
+
+[width="100%",cols="10%,10%,10%,70%",options="header",]
+|=======================================================================
+|Header key |Quick constant |Description (extracted from MongoDB API doc) |Expected type
+
+|`CamelMongoDbBatchSize` |`MongoDbConstants.BATCH_SIZE` | Sets the number of documents to return per batch. |int/Integer
+|=======================================================================
+
+You can also "stream" the documents returned from the server into your route by including outputType=DBCursor (Camel 2.21+) as an endpoint option
+which may prove simpler than setting the above headers. This hands your Exchange the DBCursor from the Mongo driver, just as if you were executing
+the aggregate() within the Mongo shell, allowing your route to iterate over the results. By default and without this option, this component will load
+the documents from the driver's cursor into a List and return this to your route - which may result in a large number of in-memory objects. Remember,
+with a DBCursor do not ask for the number of documents matched - see the MongoDB documentation site for details.
+
+Example with option outputType=DBCursor and batch size:
+
+[source,java]
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------
+// route: from("direct:aggregate").to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate");
+from("direct:aggregate")
+    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
+    .setBody().constant("[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]")
+    .to("mongodb:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=DBCursor")
+    .to("mock:resultAggregate");
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------
+
+
 #### getDbStats
 
 Equivalent of running the `db.stats()` command in the MongoDB shell,
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
index a13e436..f0192d9 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.mongodb;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -490,23 +491,40 @@ public class MongoDbProducer extends DefaultProducer {
             try {
                 MongoCollection<BasicDBObject> dbCol = calculateCollection(exchange);
                 DBObject query = exchange.getIn().getMandatoryBody(DBObject.class);
-
-                // Impossible with java driver to get the batch size and number to skip
-                List<BasicDBObject> dbIterator = new ArrayList<>();
-                AggregateIterable<BasicDBObject> aggregationResult;
-
+                
                 // Allow body to be a pipeline
                 // @see http://docs.mongodb.org/manual/core/aggregation/
+                List<Bson> queryList;
                 if (query instanceof BasicDBList) {
-                    List<Bson> queryList = ((BasicDBList) query).stream().map(o -> (Bson) o).collect(Collectors.toList());
-                    aggregationResult = dbCol.aggregate(queryList);
+                    queryList = ((BasicDBList) query).stream().map(o -> (Bson) o).collect(Collectors.toList());
+                } else {
+                    queryList = Arrays.asList((Bson) query);
+                }
+                
+                // the number to skip must be in body query
+                AggregateIterable<BasicDBObject> aggregationResult = dbCol.aggregate(queryList);
+
+                // get the batch size
+                Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
+                
+                if (batchSize != null) {
+                    aggregationResult.batchSize(batchSize);
+                }
+                
+                Iterable<BasicDBObject> result;
+                if (!MongoDbOutputType.DBCursor.equals(endpoint.getOutputType())) {
+                    try {
+                        result = new ArrayList<>();
+                        aggregationResult.iterator().forEachRemaining(((List<BasicDBObject>) result)::add);
+                        exchange.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<BasicDBObject>) result).size());
+                    } finally {
+                        aggregationResult.iterator().close();
+                    }
                 } else {
-                    List<Bson> queryList = new ArrayList<>();
-                    queryList.add((Bson) query);
-                    aggregationResult = dbCol.aggregate(queryList);
+                    result = aggregationResult;
                 }
-                aggregationResult.iterator().forEachRemaining(dbIterator::add);
-                return dbIterator;
+                
+                return result;
             } catch (InvalidPayloadException e) {
                 throw new CamelMongoDbException("Invalid payload for aggregate", e);
             }
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbAggregateOperationTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbAggregateOperationTest.java
new file mode 100644
index 0000000..b547d23
--- /dev/null
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbAggregateOperationTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.List;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoIterable;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class MongoDbAggregateOperationTest extends AbstractMongoDbTest {
+
+   
+    @Test
+    public void testAggregate() throws Exception {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        // result sorted by _id
+        Object result = template
+            .requestBody("direct:aggregate",
+                         "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},"
+                         + "{ $group: { _id: \"$scientist\", count: { $sum: 1 }} },{ $sort : { _id : 1}} ]");
+        
+        assertTrue("Result is not of type List", result instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<BasicDBObject> resultList = (List<BasicDBObject>) result;
+        assertListSize("Result does not contain 2 elements", resultList, 2);
+
+        assertEquals("First result DBOject._id should be Darwin", "Darwin", resultList.get(0).get("_id"));
+        assertEquals("First result DBOject.count should be 100", 100, resultList.get(0).get("count"));
+        assertEquals("Second result DBOject._id should be Einstein", "Einstein", resultList.get(1).get("_id"));
+        assertEquals("Second result DBOject.count should be 100", 100, resultList.get(1).get("count"));
+    }
+    
+    @Test
+    public void testAggregateDBCursor() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        Object result = template
+                .requestBody("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]");
+        
+        assertTrue("Result is not of type DBCursor", result instanceof MongoIterable);
+
+        MongoIterable<BasicDBObject> resultCursor = (MongoIterable<BasicDBObject>) result;
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (DBObject dbObject : resultCursor) {
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("_id"));
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("scientist"));
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("fixedField"));
+            count++;
+        }
+        assertEquals("Result does not contain 200 elements", 200, count);
+    }
+
+    @Test
+    public void testAggregateDBCursorBatchSize() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        Object result = template
+                .requestBodyAndHeader("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]", MongoDbConstants.BATCH_SIZE, 10);
+        
+        assertTrue("Result is not of type DBCursor", result instanceof MongoIterable);
+
+        MongoIterable<BasicDBObject> resultCursor = (MongoIterable<BasicDBObject>) result;
+
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (DBObject dbObject : resultCursor) {
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("_id"));
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("scientist"));
+            assertNotNull("DBObject in returned list should contain all fields", dbObject.get("fixedField"));
+            count++;
+        }
+        assertEquals("Result does not contain 200 elements", 200, count);
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:aggregate")
+                    .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+                from("direct:aggregateDBCursor")
+                    .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&dynamicity=true&outputType=DBCursor")
+                      .to("mock:resultAggregateDBCursor");
+                }
+            };
+    }
+}
+
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
index ee4a411..596cbe0 100644
--- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
@@ -243,24 +243,6 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest {
     }
     
     @Test
-    public void testAggregate() throws Exception {
-        // Test that the collection has 0 documents in it
-        assertEquals(0, testCollection.count());
-        pumpDataIntoTestCollection();
-
-        // Repeat ten times, obtain 10 batches of 100 results each time
-        Object result = template
-            .requestBody("direct:aggregate",
-                         "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]");
-        assertTrue("Result is not of type List", result instanceof List);
-
-        @SuppressWarnings("unchecked")
-        List<BasicDBObject> resultList = (List<BasicDBObject>)result;
-        assertListSize("Result does not contain 2 elements", resultList, 2);
-        // TODO Add more asserts
-    }
-    
-    @Test
     public void testDbStats() throws Exception {
         assertEquals(0, testCollection.count());
         Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
@@ -330,7 +312,6 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest {
                     setBody().header(MongoDbConstants.OID);
                 from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE");
                 from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE");
-                from("direct:aggregate").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&writeConcern=SAFE");
                 from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&operation=getDbStats");
                 from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");
                 from("direct:command").to("mongodb:myDb?database={{mongodb.testDb}}&operation=command");
diff --git a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
index a47f9be..b236089 100644
--- a/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
+++ b/components/camel-mongodb3/src/main/docs/mongodb3-component.adoc
@@ -88,7 +88,7 @@ with the following path and query parameters:
 | *createCollection* (common) | Create collection during initialisation if it doesn't exist. Default is true. | true | boolean
 | *database* (common) | Sets the name of the MongoDB database to target |  | String
 | *operation* (common) | Sets the operation this endpoint will execute against MongoDB. For possible values see MongoDbOperation. |  | MongoDbOperation
-| *outputType* (common) | Convert the output of the producer to the selected type : DocumentList Document or MongoIterable. DocumentList or Document applies to findAll. MongoIterable applies to all other operations. |  | MongoDbOutputType
+| *outputType* (common) | Convert the output of the producer to the selected type : DocumentList Document or MongoIterable. DocumentList or MongoIterable applies to findAll and aggregate. Document applies to all other operations. |  | MongoDbOutputType
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
@@ -268,6 +268,17 @@ changed even after a cursor is iterated, in which case the setting will
 apply on the next batch retrieval. |int/Integer
 |=======================================================================
 
+Example with option outputType=MongoIterable and batch size :
+
+[source,java]
+-----------------------------------------------------------------------------
+from("direct:findAll")
+    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
+    .setHeader(MongoDbConstants.CRITERIA, Filters.eq("name", "Raul Kripalani"))
+    .to("mongodb3:myDb?database=flights&collection=tickets&operation=findAll&outputType=MongoIterable")
+    .to("mock:resultFindAll");
+-----------------------------------------------------------------------------
+
 The `findAll` operation will also return the following OUT headers to
 enable you to iterate through result pages if you are using paging:
 
@@ -574,6 +585,37 @@ from("direct:aggregate")
     .to("mock:resultAggregate");
 ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
+
+Efficient retrieval is supported via outputType=MongoIterable and the following header :
+
+[width="100%",cols="10%,10%,10%,70%",options="header",]
+|=======================================================================
+|Header key |Quick constant |Description (extracted from MongoDB API doc) |Expected type
+
+|`CamelMongoDbBatchSize` |`MongoDbConstants.BATCH_SIZE` | Sets the number of documents to return per batch. |int/Integer
+|=======================================================================
+
+You can also "stream" the documents returned from the server into your route by including outputType=DBCursor (Camel 2.21+) as an endpoint option
+which may prove simpler than setting the above headers. This hands your Exchange the DBCursor from the Mongo driver, just as if you were executing
+the aggregate() within the Mongo shell, allowing your route to iterate over the results. By default and without this option, this component will load
+the documents from the driver's cursor into a List and return this to your route - which may result in a large number of in-memory objects. Remember,
+with a DBCursor do not ask for the number of documents matched - see the MongoDB documentation site for details.
+
+Example with option outputType=MongoIterable and batch size:
+
+[source,java]
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------
+// route: from("direct:aggregate").to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable");
+List<Bson> aggregate = Arrays.asList(match(or(eq("scientist", "Darwin"), eq("scientist", 
+        group("$scientist", sum("count", 1)));
+from("direct:aggregate")
+    .setHeader(MongoDbConstants.BATCH_SIZE).constant(10)
+    .setBody().constant(aggregate)
+    .to("mongodb3:myDb?database=science&collection=notableScientists&operation=aggregate&outputType=MongoIterable")
+    .to("mock:resultAggregate");
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------
+
+
 #### getDbStats
 
 Equivalent of running the `db.stats()` command in the MongoDB shell,
diff --git a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
index c9c5dd0..d56e83b 100644
--- a/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
+++ b/components/camel-mongodb3/src/main/java/org/apache/camel/component/mongodb3/MongoDbProducer.java
@@ -17,15 +17,13 @@
 package org.apache.camel.component.mongodb3;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import com.mongodb.BasicDBList;
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBObject;
 import com.mongodb.client.AggregateIterable;
 import com.mongodb.client.DistinctIterable;
 import com.mongodb.client.FindIterable;
@@ -43,7 +41,6 @@ import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
-//import org.bson.BsonDocument;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.slf4j.Logger;
@@ -512,27 +509,42 @@ public class MongoDbProducer extends DefaultProducer {
         return exchange -> {
             try {
                 MongoCollection<Document> dbCol = calculateCollection(exchange);
-
-                // Impossible with java driver to get the batch size and number
-                // to skip
-                List<Document> dbIterator = new ArrayList<>();
-                AggregateIterable<Document> aggregationResult;
-
                 @SuppressWarnings("unchecked")
                 List<Bson> query = exchange.getIn().getMandatoryBody((Class<List<Bson>>)Class.class.cast(List.class));
-
+                
                 // Allow body to be a pipeline
                 // @see http://docs.mongodb.org/manual/core/aggregation/
+                List<Bson> queryList;
                 if (query != null) {
-                    List<Bson> queryList = query.stream().map(o -> (Bson)o).collect(Collectors.toList());
-                    aggregationResult = dbCol.aggregate(queryList);
+                    queryList = query.stream().map(o -> (Bson)o).collect(Collectors.toList());
+                } else {
+                    queryList = Arrays.asList(Bson.class.cast(exchange.getIn().getMandatoryBody(Bson.class)));
+                }
+                
+                // The number to skip must be in body query
+                AggregateIterable<Document> aggregationResult = dbCol.aggregate(queryList);
+                
+                // get the batch size
+                Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
+                
+                if (batchSize != null) {
+                    aggregationResult.batchSize(batchSize);
+                }
+                
+                Iterable<Document> result;                
+                if (!MongoDbOutputType.MongoIterable.equals(endpoint.getOutputType())) {
+                    try {
+                        result = new ArrayList<>();
+                        aggregationResult.iterator().forEachRemaining(((List<Document>) result)::add);
+                        exchange.getOut().setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ((List<Document>) result).size());
+                    } finally {
+                        aggregationResult.iterator().close();
+                    }
                 } else {
-                    List<Bson> queryList = new ArrayList<>();
-                    queryList.add(Bson.class.cast(exchange.getIn().getMandatoryBody(Bson.class)));
-                    aggregationResult = dbCol.aggregate(queryList);
+                    result = aggregationResult;
                 }
-                aggregationResult.iterator().forEachRemaining(dbIterator::add);
-                return dbIterator;
+                
+                return result;
             } catch (InvalidPayloadException e) {
                 throw new CamelMongoDbException("Invalid payload for aggregate", e);
             }
diff --git a/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbAggregateOperationTest.java b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbAggregateOperationTest.java
new file mode 100644
index 0000000..9241567
--- /dev/null
+++ b/components/camel-mongodb3/src/test/java/org/apache/camel/component/mongodb3/MongoDbAggregateOperationTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb3;
+
+import java.util.List;
+
+import com.mongodb.client.MongoIterable;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.bson.Document;
+import org.junit.Test;
+
+public class MongoDbAggregateOperationTest extends AbstractMongoDbTest {
+
+   
+    @Test
+    public void testAggregate() throws Exception {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        // result sorted by _id
+        Object result = template
+            .requestBody("direct:aggregate",
+                         "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},"
+                         + "{ $group: { _id: \"$scientist\", count: { $sum: 1 }} },{ $sort : { _id : 1}} ]");
+        
+        assertTrue("Result is not of type List", result instanceof List);
+
+        @SuppressWarnings("unchecked")
+        List<Document> resultList = (List<Document>) result;
+        assertListSize("Result does not contain 2 elements", resultList, 2);
+
+        assertEquals("First result Document._id should be Darwin", "Darwin", resultList.get(0).get("_id"));
+        assertEquals("First result Document.count should be 100", 100, resultList.get(0).get("count"));
+        assertEquals("Second result Document._id should be Einstein", "Einstein", resultList.get(1).get("_id"));
+        assertEquals("Second result Document.count should be 100", 100, resultList.get(1).get("count"));
+    }
+    
+    @Test
+    public void testAggregateDBCursor() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        Object result = template
+                .requestBody("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]");
+        
+        assertTrue("Result is not of type DBCursor", result instanceof MongoIterable);
+
+        MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (Document document : resultCursor) {
+            assertNotNull("Document in returned list should contain all fields", document.get("_id"));
+            assertNotNull("Document in returned list should contain all fields", document.get("scientist"));
+            assertNotNull("Document in returned list should contain all fields", document.get("fixedField"));
+            count++;
+        }
+        assertEquals("Result does not contain 200 elements", 200, count);
+    }
+
+    @Test
+    public void testAggregateDBCursorBatchSize() {
+        // Test that the collection has 0 documents in it
+        assertEquals(0, testCollection.count());
+        pumpDataIntoTestCollection();
+
+        Object result = template
+                .requestBodyAndHeader("direct:aggregateDBCursor",
+                        "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}}]", MongoDbConstants.BATCH_SIZE, 10);
+
+        assertTrue("Result is not of type DBCursor", result instanceof MongoIterable);
+
+        MongoIterable<Document> resultCursor = (MongoIterable<Document>) result;
+
+        // Ensure that all returned documents contain all fields
+        int count = 0;
+        for (Document document : resultCursor) {
+            assertNotNull("Document in returned list should contain all fields", document.get("_id"));
+            assertNotNull("Document in returned list should contain all fields", document.get("scientist"));
+            assertNotNull("Document in returned list should contain all fields", document.get("fixedField"));
+            count++;
+        }
+        assertEquals("Result does not contain 200 elements", 200, count);
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:aggregate")
+                    .to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate");
+                from("direct:aggregateDBCursor")
+                    .to("mongodb3:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&dynamicity=true&outputType=MongoIterable")
+                      .to("mock:resultAggregateDBCursor");
+            }
+        };
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
acosentino@apache.org.