You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/07 09:56:27 UTC

[1/4] camel git commit: Added support for oplog tracking. At the same time added interface for extending the strategy of extraction of the tailTrackingIncreasingField and creation of the cursor used to track the tail or a collection.

Repository: camel
Updated Branches:
  refs/heads/master b20726641 -> 7a1bf6922


Added support for oplog tracking. At the same time added interface for extending the strategy of extraction of the tailTrackingIncreasingField and creation of the cursor used to track the tail or a collection.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cf2a11f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cf2a11f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cf2a11f

Branch: refs/heads/master
Commit: 3cf2a11fcae1f8502d425780039fa3eda9d23663
Parents: b207266
Author: gilfernandes <gi...@gmail.com>
Authored: Fri Jan 6 19:18:28 2017 +0000
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jan 7 10:52:28 2017 +0100

----------------------------------------------------------------------
 components/camel-mongodb/pom.xml                |   6 +
 .../src/main/docs/mongodb-component.adoc        | 122 ++++++++++++++++++-
 .../mongodb/MongoDBTailTrackingEnum.java        |  52 ++++++++
 .../mongodb/MongoDBTailTrackingStrategy.java    |  42 +++++++
 .../component/mongodb/MongoDbEndpoint.java      |  43 ++++++-
 .../mongodb/MongoDbTailTrackingConfig.java      |  34 +++---
 .../mongodb/MongoDbTailTrackingManager.java     |  31 +++--
 .../mongodb/MongoDbTailingProcess.java          |   9 +-
 .../MongoDBTailTrackingStrategyTest.java        |  75 ++++++++++++
 9 files changed, 376 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/pom.xml b/components/camel-mongodb/pom.xml
index 8f04d97..81f3045 100644
--- a/components/camel-mongodb/pom.xml
+++ b/components/camel-mongodb/pom.xml
@@ -83,6 +83,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>de.flapdoodle.embed</groupId>
       <artifactId>de.flapdoodle.embed.mongo</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/docs/mongodb-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/docs/mongodb-component.adoc b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
index ca4dd5a..b63b4e7 100644
--- a/components/camel-mongodb/src/main/docs/mongodb-component.adoc
+++ b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
@@ -66,7 +66,7 @@ The MongoDB component has no options.
 
 
 // endpoint options: START
-The MongoDB component supports 22 endpoint options which are listed below:
+The MongoDB component supports 24 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -85,15 +85,17 @@ The MongoDB component supports 22 endpoint options which are listed below:
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
 | cursorRegenerationDelay | advanced | 1000 | long | MongoDB tailable cursors will block until new data arrives. If no new data is inserted after some time the cursor will be automatically freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait before attempting to fetch a new cursor and if the attempt fails how long before the next attempt is made. Default value is 1000ms.
 | dynamicity | advanced | false | boolean | Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties. Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI. It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
-| readPreference | advanced |  | ReadPreference | Sets a MongoDB ReadPreference on the Mongo connection. Read preferences set directly on the connection will be overridden by this setting. The link com.mongodb.ReadPreferencevalueOf(String) utility method is used to resolve the passed readPreference value. Some examples for the possible values are nearest primary or secondary etc.
+| readPreference | advanced |  | ReadPreference | Sets a MongoDB ReadPreference on the Mongo connection. Read preferences set directly on the connection will be overridden by this setting. The link ReadPreferencevalueOf(String) utility method is used to resolve the passed readPreference value. Some examples for the possible values are nearest primary or secondary etc.
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 | writeResultAsHeader | advanced | false | boolean | In write operations it determines whether instead of returning WriteResult as the body of the OUT message we transfer the IN message to the OUT and attach the WriteResult as a header.
 | persistentId | tail |  | String | One tail tracking collection can host many trackers for several tailable consumers. To keep them separate each tracker should have its own unique persistentId.
 | persistentTailTracking | tail | false | boolean | Enable persistent tail tracking which is a mechanism to keep track of the last consumed message across system restarts. The next time the system is up the endpoint will recover the cursor from the point where it last stopped slurping records.
+| persistRecords | tail | -1 | int | Sets the number of tailed records after which the tail tracking data is persisted to MongoDB.
 | tailTrackCollection | tail |  | String | Collection where tail tracking information will be persisted. If not specified link MongoDbTailTrackingConfigDEFAULT_COLLECTION will be used by default.
 | tailTrackDb | tail |  | String | Indicates what database the tail tracking mechanism will persist to. If not specified the current database will be picked by default. Dynamicity will not be taken into account even if enabled i.e. the tail tracking database will not vary past endpoint initialisation.
 | tailTrackField | tail |  | String | Field where the last tracked value will be placed. If not specified link MongoDbTailTrackingConfigDEFAULT_FIELD will be used by default.
 | tailTrackIncreasingField | tail |  | String | Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every time it is generated. The cursor will be (re)created with a query of type: tailTrackIncreasingField lastValue (possibly recovered from persistent tail tracking). Can be of type Integer Date String etc. NOTE: No support for dot notation at the current time so the field should be at the top level of the document.
+| tailTrackingStrategy | tail | LITERAL | MongoDBTailTrackingEnum | Sets the strategy used to extract the increasing field value and to create the query to position the tail cursor.
 |=======================================================================
 {% endraw %}
 // endpoint options: END
@@ -812,6 +814,122 @@ from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasing
     .to("mock:test");
 -----------------------------------------------------------------------------------------------------------------------------------
 
+[[MongoDB-TailtrackingOnOplog]]
+Oplog Tail Tracking
+^^^^^^^^^^^^^^^^^^^
+
+The *oplog* collection tracking feature allows to implement trigger like functionality in MongoDB.
+In order to activate this collection you will have first to activate a replica set. For more
+information on this topic please check https://docs.mongodb.com/manual/tutorial/deploy-replica-set/ .
+
+Below you can find an example of a Java DSL based route demonstrating how you can use the component to track the *oplog*
+collection. In this specific case we are filtering the events which affect a collection *customers* in
+database *optlog_test*. Note that the `tailTrackIncreasingField` is a timestamp field ('ts') which implies
+that you have to use the `tailTrackingStrategy` parameter with the *TIMESTAMP* value.
+
+[source,java]
+-----------------------------------------------------------------------------------------------------------------------------------
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
+import org.apache.camel.main.Main;
+
+import java.io.InputStream;
+
+/**
+ * For this to work you need to turn on the replica set
+ * <p>
+ * Commands to create a replica set:
+ * <p>
+ * rs.initiate( {
+ * _id : "rs0",
+ * members: [ { _id : 0, host : "localhost:27017" } ]
+ * })
+ */
+public class MongoDbTracker {
+
+    private final String database;
+
+    private final String collection;
+
+    private final String increasingField;
+
+    private MongoDBTailTrackingEnum trackingStrategy;
+
+    private int persistRecords = -1;
+
+    private boolean persistenTailTracking;
+
+    public MongoDbTracker(String database, String collection, String increasingField) {
+        this.database = database;
+        this.collection = collection;
+        this.increasingField = increasingField;
+    }
+
+    public static void main(String[] args) throws Exception {
+        final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", "oplog.rs", "ts");
+        mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
+        mongoDbTracker.setPersistRecords(5);
+        mongoDbTracker.setPersistenTailTracking(true);
+        mongoDbTracker.startRouter();
+        // run until you terminate the JVM
+        System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
+
+    }
+
+    public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
+        this.trackingStrategy = trackingStrategy;
+    }
+
+    public void setPersistRecords(int persistRecords) {
+        this.persistRecords = persistRecords;
+    }
+
+    public void setPersistenTailTracking(boolean persistenTailTracking) {
+        this.persistenTailTracking = persistenTailTracking;
+    }
+
+    void startRouter() throws Exception {
+        // create a Main instance
+        Main main = new Main();
+        main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 27017));
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, BasicDBObject.class,
+                        new MongoToInputStreamConverter());
+                from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + database
+                        + "&collection=" + collection
+                        + "&persistentTailTracking=" + persistenTailTracking
+                        + "&persistentId=trackerName" + "&tailTrackDb=local"
+                        + "&tailTrackCollection=talendTailTracking"
+                        + "&tailTrackField=lastTrackingValue"
+                        + "&tailTrackIncreasingField=" + increasingField
+                        + "&tailTrackingStrategy=" + trackingStrategy.toString()
+                        + "&persistRecords=" + persistRecords
+                        + "&cursorRegenerationDelay=1000")
+                        .filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
+                        .id("logger")
+                        .to("log:logger?level=WARN")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                Message message = exchange.getIn();
+                                System.out.println(message.getBody().toString());
+                                exchange.getOut().setBody(message.getBody().toString());
+                            }
+                        });
+            }
+        });
+        main.run();
+    }
+}
+-----------------------------------------------------------------------------------------------------------------------------------
+
+
 [[MongoDB-Typeconversions]]
 Type conversions
 ~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
new file mode 100644
index 0000000..65d5797
--- /dev/null
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import org.bson.types.BSONTimestamp;
+
+/**
+ * Contains the concrete MongoDB tail tracking strategies.
+ */
+public enum MongoDBTailTrackingEnum implements MongoDBTailTrackingStrategy {
+
+    TIMESTAMP {
+
+        @Override
+        public Object extractLastVal(DBObject o, String increasingField) {
+            Object temp = o.get(increasingField);
+            return ((BSONTimestamp) temp).getTime();
+        }
+
+        @Override
+        public BasicDBObject createQuery(Object lastVal, String increasingField) {
+            return new BasicDBObject(increasingField, new BasicDBObject("$gt", new BSONTimestamp((Integer)lastVal, 1)));
+        }
+    }, LITERAL {
+
+        @Override
+        public Object extractLastVal(DBObject o, String increasingField) {
+            return o.get(increasingField);
+        }
+
+        @Override
+        public BasicDBObject createQuery(Object lastVal, String increasingField) {
+            return new BasicDBObject(increasingField, new BasicDBObject("$gt", lastVal));
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
new file mode 100644
index 0000000..18a634c
--- /dev/null
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+
+/**
+ * This class contains different methods for extracting and saving tail tracking information.
+ */
+public interface MongoDBTailTrackingStrategy {
+
+    /**
+     * Extracts the last tracking value using the field name or an expression.
+     * @param o The object retrieved by the trailing process.
+     * @param increasingField The field name or an expression used to extract the last value.
+     * @return an object representing the last tracking value in a MongoDB collection.
+     */
+    Object extractLastVal(DBObject o, String increasingField);
+
+    /**
+     * Creates an object to be used in a query using the last tracking value.
+     * @param lastVal The last tracking value.
+     * @param increasingField The field name or an expression used to extract the last value.
+     * @return the object to be used in a MongoDB query.
+     */
+    BasicDBObject createQuery(Object lastVal, String increasingField);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 4ce84c7..15cc261 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -58,7 +58,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private static final Logger LOG = LoggerFactory.getLogger(MongoDbEndpoint.class);
 
     private MongoClient mongoConnection;
-    
+
     @UriPath @Metadata(required = "true")
     private String connectionBean;
     @UriParam
@@ -103,6 +103,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     @UriParam
     private MongoDbOutputType outputType;
 
+    @UriParam(label = "tail", defaultValue = "LITERAL")
+    private MongoDBTailTrackingEnum tailTrackingStrategy;
+
+    @UriParam(label = "tail", defaultValue = "-1")
+    private int persistRecords;
+
     private MongoDatabase mongoDatabase;
     private MongoCollection<BasicDBObject> mongoCollection;
 
@@ -182,6 +188,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                 if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
                     throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
                 }
+                if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
+                    throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
+                }
             }
 
         } else {
@@ -290,7 +299,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         setWriteReadOptionsOnConnection();
         super.doStart();
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -460,7 +469,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
      * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
      * overridden by this setting.
      * <p/>
-     * The {@link com.mongodb.ReadPreference#valueOf(String)} utility method is used to resolve the passed {@code readPreference}
+     * The {@link ReadPreference#valueOf(String)} utility method is used to resolve the passed {@code readPreference}
      * value. Some examples for the possible values are {@code nearest}, {@code primary} or {@code secondary} etc.
      * 
      * @param readPreference the name of the read preference to set
@@ -586,7 +595,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoDbTailTrackingConfig getTailTrackingConfig() {
         if (tailTrackingConfig == null) {
             tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, tailTrackDb == null ? database : tailTrackDb, tailTrackCollection,
-                    tailTrackField, getPersistentId());
+                    tailTrackField, getPersistentId(), tailTrackingStrategy);
         }
         return tailTrackingConfig;
     }
@@ -655,4 +664,30 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoCollection<BasicDBObject> getMongoCollection() {
         return mongoCollection;
     }
+
+    public MongoDBTailTrackingEnum getTailTrackingStrategy() {
+        return tailTrackingStrategy;
+    }
+
+    /**
+     * Sets the strategy used to extract the increasing field value and to create the query to position the
+     * tail cursor.
+     * @param tailTrackingStrategy The strategy used to extract the increasing field value and to create the query to position the
+     * tail cursor.
+     */
+    public void setTailTrackingStrategy(MongoDBTailTrackingEnum tailTrackingStrategy) {
+        this.tailTrackingStrategy = tailTrackingStrategy;
+    }
+
+    public int getPersistRecords() {
+        return persistRecords;
+    }
+
+    /**
+     * Sets the number of tailed records after which the tail tracking data is persisted to MongoDB.
+     * @param persistRecords The number of tailed records after which the tail tracking data is persisted to MongoDB.
+     */
+    public void setPersistRecords(int persistRecords) {
+        this.persistRecords = persistRecords;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
index 7b998f5..c446d4d 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
@@ -18,17 +18,9 @@ package org.apache.camel.component.mongodb;
 
 public class MongoDbTailTrackingConfig {
     
-    public static final String DEFAULT_COLLECTION = "camelTailTracking";
-    public static final String DEFAULT_FIELD = "lastTrackingValue";
-    
-    /**
-     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
-     */
-    public final String increasingField;
-    /**
-     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
-     */
-    public final boolean persistent;
+    static final String DEFAULT_COLLECTION = "camelTailTracking";
+    static final String DEFAULT_FIELD = "lastTrackingValue";
+
     /**
      * See {@link MongoDbEndpoint#setTailTrackDb(String)}
      */
@@ -38,21 +30,35 @@ public class MongoDbTailTrackingConfig {
      */
     public final String collection;
     /**
+     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+     */
+    final String increasingField;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+     */
+    final boolean persistent;
+    /**
      * See {@link MongoDbEndpoint#setTailTrackField(String)}
      */
-    public final String field;
+    final String field;
     /**
      * See {@link MongoDbEndpoint#setPersistentId(String)}
      */
-    public final String persistentId;
+    final String persistentId;
+
+    /**
+     * See {@link MongoDbEndpoint#setTailTrackingStrategy(MongoDBTailTrackingEnum)}
+     */
+    final MongoDBTailTrackingEnum mongoDBTailTrackingStrategy;
     
     public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
-            String tailTrackCollection, String tailTrackField, String persistentId) {
+            String tailTrackCollection, String tailTrackField, String persistentId, MongoDBTailTrackingEnum mongoDBTailTrackingStrategy) {
         this.increasingField = tailTrackIncreasingField;
         this.persistent = persistentTailTracking;
         this.db = tailTrackDb;
         this.persistentId = persistentId;
         this.collection = tailTrackCollection == null ? MongoDbTailTrackingConfig.DEFAULT_COLLECTION : tailTrackCollection;
         this.field = tailTrackField == null ? MongoDbTailTrackingConfig.DEFAULT_FIELD : tailTrackField;
+        this.mongoDBTailTrackingStrategy = mongoDBTailTrackingStrategy == null ? MongoDBTailTrackingEnum.LITERAL : mongoDBTailTrackingStrategy;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
index 7e74a7e..68cf83a 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
@@ -20,31 +20,29 @@ import com.mongodb.BasicDBObject;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
-
+import org.bson.types.BSONTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MongoDbTailTrackingManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
-    
     public Object lastVal;
-
     private final MongoClient connection;
     private final MongoDbTailTrackingConfig config;
     private MongoCollection<BasicDBObject> dbCol;
     private BasicDBObject trackingObj;
-    
+
     public MongoDbTailTrackingManager(MongoClient connection, MongoDbTailTrackingConfig config) {
         this.connection = connection;
         this.config = config;
     }
-    
+
     public void initialize() throws Exception {
         if (!config.persistent) {
             return;
         }
-        
+
         dbCol = connection.getDatabase(config.db).getCollection(config.collection, BasicDBObject.class);
         BasicDBObject filter = new BasicDBObject("persistentId", config.persistentId);
         trackingObj = dbCol.find(filter).first();
@@ -55,43 +53,42 @@ public class MongoDbTailTrackingManager {
         // keep only the _id, the rest is useless and causes more overhead during update
         trackingObj = new BasicDBObject("_id", trackingObj.get("_id"));
     }
-    
+
     public synchronized void persistToStore() {
         if (!config.persistent || lastVal == null) {
             return;
         }
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
         }
-        
+
         BasicDBObject updateObj = new BasicDBObject().append("$set", new BasicDBObject(config.field, lastVal));
         dbCol.updateOne(trackingObj, updateObj);
         trackingObj = dbCol.find().first();
     }
-    
+
     public synchronized Object recoverFromStore() {
         if (!config.persistent) {
             return null;
         }
-        
+
         lastVal = dbCol.find(trackingObj).first().get(config.field);
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
         }
-        
+
         return lastVal;
     }
-    
+
     public void setLastVal(DBObject o) {
         if (config.increasingField == null) {
             return;
         }
-        
-        lastVal = o.get(config.increasingField);
+        lastVal = config.mongoDBTailTrackingStrategy.extractLastVal(o, config.increasingField);
     }
-    
+
     public String getIncreasingFieldName() {
         return config.increasingField;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
index 4c5fb8a..2a1d574 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
@@ -151,6 +151,9 @@ public class MongoDbTailingProcess implements Runnable {
      * The heart of the tailing process.
      */
     private void doRun() {
+        int counter = 0;
+        int persistRecords = endpoint.getPersistRecords();
+        boolean persistRegularly = persistRecords > 0;
         // while the cursor has more values, keepRunning is true and the cursorId is not 0, which symbolizes that the cursor is dead
         try {
             while (cursor.hasNext() && keepRunning) { //cursor.getCursorId() != 0 &&
@@ -165,6 +168,9 @@ public class MongoDbTailingProcess implements Runnable {
                     // do nothing
                 }
                 tailTracking.setLastVal(dbObj);
+                if (persistRegularly && counter++ % persistRecords == 0) {
+                    tailTracking.persistToStore();
+                }
             }
         } catch (MongoCursorNotFoundException e) {
             // we only log the warning if we are not stopping, otherwise it is expected because the stop() method kills the cursor just in case it is blocked
@@ -187,7 +193,8 @@ public class MongoDbTailingProcess implements Runnable {
         if (lastVal == null) {
             answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
         } else {
-            BasicDBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
+            final String increasingFieldName = tailTracking.getIncreasingFieldName();
+            BasicDBObject queryObj = endpoint.getTailTrackingStrategy().createQuery(lastVal, increasingFieldName);
             answer = dbCol.find(queryObj).cursorType(CursorType.TailableAwait).iterator();
         }
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
new file mode 100644
index 0000000..2534701
--- /dev/null
+++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import org.bson.types.BSONTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MongoDBTailTrackingStrategyTest {
+
+    private static final String INCREASING_FIELD_NAME = "ts";
+
+    @Test
+    public void testExtractLastValForLiterals() throws Exception {
+        int expected = 1483701465;
+        DBObject o = mock(DBObject.class);
+        when(o.get(INCREASING_FIELD_NAME)).thenReturn(expected);
+        Object lastVal = MongoDBTailTrackingEnum.LITERAL.extractLastVal(o, INCREASING_FIELD_NAME);
+        assertThat(lastVal, is(expected));
+    }
+
+    @Test
+    public void testCreateQueryForLiterals() {
+        Integer lastVal = 1483701465;
+        BasicDBObject basicDBObject = MongoDBTailTrackingEnum.LITERAL.createQuery(lastVal, INCREASING_FIELD_NAME);
+        final Object actual = basicDBObject.get(INCREASING_FIELD_NAME);
+        assertThat(actual, is(notNullValue()));
+        assertThat(actual instanceof BasicDBObject, is(true));
+        assertThat(((BasicDBObject)actual).get("$gt"), is(lastVal));
+    }
+
+    @Test
+    public void testExtractLastValForTimestamp() throws Exception {
+        DBObject o = mock(DBObject.class);
+        final int lastVal = 1483701465;
+        when(o.get(INCREASING_FIELD_NAME)).thenReturn(new BSONTimestamp(lastVal, 1));
+        Object res = MongoDBTailTrackingEnum.TIMESTAMP.extractLastVal(o, INCREASING_FIELD_NAME);
+        assertThat(res, is(lastVal));
+    }
+
+    @Test
+    public void testExtracCreateQueryForTimestamp() throws Exception {
+        final int lastVal = 1483701465;
+        BasicDBObject basicDBObject = MongoDBTailTrackingEnum.TIMESTAMP.createQuery(lastVal, INCREASING_FIELD_NAME);
+        final Object actual = basicDBObject.get(INCREASING_FIELD_NAME);
+        assertThat(actual, is(notNullValue()));
+        assertThat(actual instanceof BasicDBObject, is(true));
+        assertThat(((BasicDBObject)actual).get("$gt") instanceof BSONTimestamp, is(true));
+        BSONTimestamp bsonTimestamp = (BSONTimestamp) ((BasicDBObject)actual).get("$gt");
+        assertThat(bsonTimestamp.getTime(), is(lastVal));
+    }
+
+
+}
\ No newline at end of file


[4/4] camel git commit: Fixed CS

Posted by ac...@apache.org.
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7a1bf692
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7a1bf692
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7a1bf692

Branch: refs/heads/master
Commit: 7a1bf6922fbf3293564950ebeaf1412cd909d844
Parents: 24a2d04
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sat Jan 7 10:54:35 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jan 7 10:54:35 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/component/mongodb/MongoDbEndpoint.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7a1bf692/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index aa75b15..35e5eee 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -663,7 +663,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     public MongoDBTailTrackingEnum getTailTrackingStrategy() {
-        if(tailTrackingStrategy == null) {
+        if (tailTrackingStrategy == null) {
             tailTrackingStrategy = MongoDBTailTrackingEnum.LITERAL;
         }
         return tailTrackingStrategy;


[3/4] camel git commit: Default tail tracking strategy must always exist. The endpoint getter always must ensure that it is never null.

Posted by ac...@apache.org.
Default tail tracking strategy must always exist. The endpoint getter always must ensure that it is never null.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/24a2d04f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/24a2d04f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/24a2d04f

Branch: refs/heads/master
Commit: 24a2d04fa51cc4e941daa4e264027bdde6fe607c
Parents: bc4bf1b
Author: gilfernandes <gi...@gmail.com>
Authored: Sat Jan 7 09:51:32 2017 +0000
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jan 7 10:52:29 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/component/mongodb/MongoDbEndpoint.java  | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/24a2d04f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 0b2081a..aa75b15 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -663,6 +663,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     }
 
     public MongoDBTailTrackingEnum getTailTrackingStrategy() {
+        if(tailTrackingStrategy == null) {
+            tailTrackingStrategy = MongoDBTailTrackingEnum.LITERAL;
+        }
         return tailTrackingStrategy;
     }
 


[2/4] camel git commit: Removed duplicate code.

Posted by ac...@apache.org.
Removed duplicate code.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bc4bf1bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bc4bf1bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bc4bf1bb

Branch: refs/heads/master
Commit: bc4bf1bbd5f599743fa426a773b940cc4cf52815
Parents: 3cf2a11
Author: gilfernandes <gi...@gmail.com>
Authored: Sat Jan 7 09:03:43 2017 +0000
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat Jan 7 10:52:29 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/component/mongodb/MongoDbEndpoint.java  | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bc4bf1bb/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 15cc261..0b2081a 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -188,9 +188,6 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                 if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
                     throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
                 }
-                if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
-                    throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
-                }
             }
 
         } else {