You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/06/03 08:01:43 UTC
[2/8] camel git commit: CAMEL-8668: MongoDbTailingProcess
CAMEL-8668: MongoDbTailingProcess
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/291286b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/291286b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/291286b3
Branch: refs/heads/master
Commit: 291286b38325c983ea1049f4a6a42afe8311e85a
Parents: 5b99703
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Mon May 30 00:03:59 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jun 3 09:56:37 2016 +0200
----------------------------------------------------------------------
.../component/mongodb/MongoDbEndpoint.java | 15 ++++++
.../mongodb/MongoDbTailingProcess.java | 56 +++++++++++---------
2 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/291286b3/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index d6c3f46..6cfe3d4 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -31,6 +31,9 @@ import com.mongodb.MongoClient;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -101,6 +104,8 @@ public class MongoDbEndpoint extends DefaultEndpoint {
private DBCollection dbCollection;
private DB db;
+ private MongoDatabase mongoDatabase;
+ private MongoCollection<BasicDBObject> mongoCollection;
// ======= Constructors ===============================================
@@ -206,6 +211,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
throw new CamelMongoDbException("Missing required endpoint configuration: database and/or collection");
}
db = mongoConnection.getDB(database);
+ mongoDatabase = mongoConnection.getDatabase(database);
if (db == null) {
throw new CamelMongoDbException("Could not initialise MongoDbComponent. Database " + database + " does not exist.");
}
@@ -214,6 +220,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
}
dbCollection = db.getCollection(collection);
+ mongoCollection = mongoDatabase.getCollection(collection, BasicDBObject.class);
LOG.debug("MongoDb component initialised and endpoint bound to MongoDB collection with the following parameters. Address list: {}, Db: {}, Collection: {}",
new Object[]{mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
@@ -638,4 +645,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
public void setOutputType(MongoDbOutputType outputType) {
this.outputType = outputType;
}
+
+ public MongoDatabase getMongoDatabase() {
+ return mongoDatabase;
+ }
+
+ public MongoCollection<BasicDBObject> getMongoCollection() {
+ return mongoCollection;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/291286b3/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
index 40a4a66..8c4dc20 100644
--- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
+++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,14 +20,13 @@ package org.apache.camel.component.mongodb;
import java.util.concurrent.CountDownLatch;
import com.mongodb.BasicDBObject;
-import com.mongodb.Bytes;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
+import com.mongodb.CursorType;
import com.mongodb.DBObject;
import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
import org.apache.camel.Exchange;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,43 +39,44 @@ public class MongoDbTailingProcess implements Runnable {
public volatile boolean stopped; // = false
private volatile CountDownLatch stoppedLatch;
- private final DBCollection dbCol;
+ private final MongoCollection<BasicDBObject> dbCol;
private final MongoDbEndpoint endpoint;
private final MongoDbTailableCursorConsumer consumer;
-
+
// create local, final copies of these variables for increased performance
private final long cursorRegenerationDelay;
private final boolean cursorRegenerationDelayEnabled;
-
- private DBCursor cursor;
+
+ private MongoCursor<BasicDBObject> cursor;
private MongoDbTailTrackingManager tailTracking;
-
+
public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
this.endpoint = endpoint;
this.consumer = consumer;
- this.dbCol = endpoint.getDbCollection();
+ this.dbCol = endpoint.getMongoCollection();
this.tailTracking = tailTrack;
this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
}
- public DBCursor getCursor() {
+ public MongoCursor<BasicDBObject> getCursor() {
return cursor;
}
/**
* Initialise the tailing process, the cursor and if persistent tail tracking is enabled, recover the cursor from the persisted point.
* As part of the initialisation process, the component will validate that the collection we are targeting is 'capped'.
+ *
* @throws Exception
*/
public void initializeProcess() throws Exception {
if (LOG.isInfoEnabled()) {
- LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + endpoint.getMongoDatabase() + ", col: " + endpoint.getCollection());
}
- if (dbCol.getStats().getInt(CAPPED_KEY) != 1) {
- throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + dbCol.getName()
+ if (!isCollectionCapped()) {
+ throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + endpoint.getCollection()
+ " is not capped");
}
try {
@@ -90,7 +90,15 @@ public class MongoDbTailingProcess implements Runnable {
if (cursor == null) {
throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
}
-
+
+ }
+
+ private Boolean isCollectionCapped() {
+ return endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
+ }
+
+ private BasicDBObject createCollStatsCommand() {
+ return new BasicDBObject("collStats", endpoint.getCollection());
}
/**
@@ -126,7 +134,7 @@ public class MongoDbTailingProcess implements Runnable {
protected void stop() throws Exception {
if (LOG.isInfoEnabled()) {
- LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
}
keepRunning = false;
// close the cursor if it's open, so if it is blocked on hasNext() it will return immediately
@@ -135,7 +143,7 @@ public class MongoDbTailingProcess implements Runnable {
}
awaitStopped();
if (LOG.isInfoEnabled()) {
- LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + endpoint.getDatabase() + ", col: " + endpoint.getCollection());
}
}
@@ -145,7 +153,7 @@ public class MongoDbTailingProcess implements Runnable {
private void doRun() {
// while the cursor has more values, keepRunning is true and the cursorId is not 0, which symbolizes that the cursor is dead
try {
- while (cursor.hasNext() && cursor.getCursorId() != 0 && keepRunning) {
+ while (cursor.hasNext() && keepRunning) { //cursor.getCursorId() != 0 &&
DBObject dbObj = cursor.next();
Exchange exchange = endpoint.createMongoDbExchange(dbObj);
try {
@@ -172,15 +180,15 @@ public class MongoDbTailingProcess implements Runnable {
}
// no arguments, will ask DB what the last updated Id was (checking persistent storage)
- private DBCursor initializeCursor() {
+ private MongoCursor<BasicDBObject> initializeCursor() {
Object lastVal = tailTracking.lastVal;
// lastVal can be null if we are initializing and there is no persistence enabled
- DBCursor answer;
+ MongoCursor<BasicDBObject> answer;
if (lastVal == null) {
- answer = dbCol.find().addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+ answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator();
} else {
- DBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
- answer = dbCol.find(queryObj).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+ BasicDBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
+ answer = dbCol.find(queryObj).cursorType(CursorType.TailableAwait).iterator();
}
return answer;
}