You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/06/03 08:01:43 UTC

[2/8] camel git commit: CAMEL-8668: MongoDbTailingProcess

CAMEL-8668: MongoDbTailingProcess


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/291286b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/291286b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/291286b3

Branch: refs/heads/master
Commit: 291286b38325c983ea1049f4a6a42afe8311e85a
Parents: 5b99703
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Mon May 30 00:03:59 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jun 3 09:56:37 2016 +0200

----------------------------------------------------------------------
 .../component/mongodb/MongoDbEndpoint.java      | 15 ++++++
 .../mongodb/MongoDbTailingProcess.java          | 56 +++++++++++---------
 2 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/291286b3/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index d6c3f46..6cfe3d4 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -31,6 +31,9 @@ import com.mongodb.MongoClient;
 import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import com.mongodb.WriteResult;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -101,6 +104,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
 
     private DBCollection dbCollection;
     private DB db;
+    private MongoDatabase mongoDatabase;
+    private MongoCollection<BasicDBObject> mongoCollection;
 
     // ======= Constructors ===============================================
 
@@ -206,6 +211,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
             throw new CamelMongoDbException("Missing required endpoint configuration: database and/or collection");
         }
         db = mongoConnection.getDB(database);
+        mongoDatabase = mongoConnection.getDatabase(database);
         if (db == null) {
             throw new CamelMongoDbException("Could not initialise MongoDbComponent. Database " + database + " does not exist.");
         }
@@ -214,6 +220,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                 throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
             }
             dbCollection = db.getCollection(collection);
+            mongoCollection = mongoDatabase.getCollection(collection, BasicDBObject.class);
 
             LOG.debug("MongoDb component initialised and endpoint bound to MongoDB collection with the following parameters. Address list: {}, Db: {}, Collection: {}",
                     new Object[]{mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
@@ -638,4 +645,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public void setOutputType(MongoDbOutputType outputType) {
         this.outputType = outputType;
     }
+
+    public MongoDatabase getMongoDatabase() {
+        return mongoDatabase;
+    }
+
+    public MongoCollection<BasicDBObject> getMongoCollection() {
+        return mongoCollection;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/291286b3/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
index 40a4a66..8c4dc20 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,14 +20,13 @@ package org.apache.camel.component.mongodb;
 import java.util.concurrent.CountDownLatch;
 
 import com.mongodb.BasicDBObject;
-import com.mongodb.Bytes;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
+import com.mongodb.CursorType;
 import com.mongodb.DBObject;
 import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
 
 import org.apache.camel.Exchange;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,43 +39,44 @@ public class MongoDbTailingProcess implements Runnable {
     public volatile boolean stopped; // = false
     private volatile CountDownLatch stoppedLatch;
 
-    private final DBCollection dbCol;
+    private final MongoCollection<BasicDBObject> dbCol;
     private final MongoDbEndpoint endpoint;
     private final MongoDbTailableCursorConsumer consumer;
-    
+
     // create local, final copies of these variables for increased performance
     private final long cursorRegenerationDelay;
     private final boolean cursorRegenerationDelayEnabled;
-    
-    private DBCursor cursor;
+
+    private MongoCursor<BasicDBObject> cursor;
     private MongoDbTailTrackingManager tailTracking;
-    
+
 
     public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
         this.endpoint = endpoint;
         this.consumer = consumer;
-        this.dbCol = endpoint.getDbCollection();
+        this.dbCol = endpoint.getMongoCollection();
         this.tailTracking = tailTrack;
         this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
         this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
     }
 
-    public DBCursor getCursor() {
+    public MongoCursor<BasicDBObject> getCursor() {
         return cursor;
     }
 
     /**
      * Initialise the tailing process, the cursor and if persistent tail tracking is enabled, recover the cursor from the persisted point.
      * As part of the initialisation process, the component will validate that the collection we are targeting is 'capped'.
+     *
      * @throws Exception
      */
     public void initializeProcess() throws Exception {
         if (LOG.isInfoEnabled()) {
-            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
         }
 
-        if (dbCol.getStats().getInt(CAPPED_KEY) != 1) {
-            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + dbCol.getName()
+        if (!isCollectionCapped()) {
+            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + endpoint.getCollection()
                     + " is not capped");
         }
         try {
@@ -90,7 +90,15 @@ public class MongoDbTailingProcess implements Runnable {
         if (cursor == null) {
             throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
         }
-        
+
+    }
+
+    private Boolean isCollectionCapped() {
+        return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+    }
+
+    private BasicDBObject createCollStatsCommand() {
+        return new BasicDBObject("collStats", endpoint.getCollection());
     }
 
     /**
@@ -126,7 +134,7 @@ public class MongoDbTailingProcess implements Runnable {
 
     protected void stop() throws Exception {
         if (LOG.isInfoEnabled()) {
-            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
         }
         keepRunning = false;
         // close the cursor if it's open, so if it is blocked on hasNext() it will return immediately
@@ -135,7 +143,7 @@ public class MongoDbTailingProcess implements Runnable {
         }
         awaitStopped();
         if (LOG.isInfoEnabled()) {
-            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
         }
     }
 
@@ -145,7 +153,7 @@ public class MongoDbTailingProcess implements Runnable {
     private void doRun() {
         // while the cursor has more values, keepRunning is true and the cursorId is not 0, which symbolizes that the cursor is dead
         try {
-            while (cursor.hasNext() && cursor.getCursorId() != 0  && keepRunning) {
+            while (cursor.hasNext() && keepRunning) { //cursor.getCursorId() != 0 &&
                 DBObject dbObj = cursor.next();
                 Exchange exchange = endpoint.createMongoDbExchange(dbObj);
                 try {
@@ -172,15 +180,15 @@ public class MongoDbTailingProcess implements Runnable {
     }
 
     // no arguments, will ask DB what the last updated Id was (checking persistent storage)
-    private DBCursor initializeCursor() {
+    private MongoCursor<BasicDBObject> initializeCursor() {
         Object lastVal = tailTracking.lastVal;
         // lastVal can be null if we are initializing and there is no persistence enabled
-        DBCursor answer;
+        MongoCursor<BasicDBObject> answer;
         if (lastVal == null) {
-            answer = dbCol.find().addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+            answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
         } else {
-            DBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
-            answer = dbCol.find(queryObj).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+            BasicDBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
+            answer = dbCol.find(queryObj).cursorType(CursorType.TailableAwait).iterator();
         }
         return answer;
     }