You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/02/16 15:09:57 UTC
svn commit: r1244980 [1/3] - in /camel/trunk: components/
components/camel-mongodb/ components/camel-mongodb/src/
components/camel-mongodb/src/main/ components/camel-mongodb/src/main/java/
components/camel-mongodb/src/main/java/org/ components/camel-mo...
Author: davsclaus
Date: Thu Feb 16 14:09:55 2012
New Revision: 1244980
URL: http://svn.apache.org/viewvc?rev=1244980&view=rev
Log:
CAMEL-4878: New camel-mongodb component. Thanks to Raul for the patch.
Added:
camel/trunk/components/camel-mongodb/ (with props)
camel/trunk/components/camel-mongodb/pom.xml (with props)
camel/trunk/components/camel-mongodb/src/
camel/trunk/components/camel-mongodb/src/main/
camel/trunk/components/camel-mongodb/src/main/java/
camel/trunk/components/camel-mongodb/src/main/java/org/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java (with props)
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/converters/
camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/converters/MongoDbBasicConverters.java (with props)
camel/trunk/components/camel-mongodb/src/main/resources/
camel/trunk/components/camel-mongodb/src/main/resources/LICENSE.txt (with props)
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/component/
camel/trunk/components/camel-mongodb/src/main/resources/META-INF/services/org/apache/camel/component/mongodb
camel/trunk/components/camel-mongodb/src/main/resources/NOTICE.txt (with props)
camel/trunk/components/camel-mongodb/src/test/
camel/trunk/components/camel-mongodb/src/test/java/
camel/trunk/components/camel-mongodb/src/test/java/org/
camel/trunk/components/camel-mongodb/src/test/java/org/apache/
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/AbstractMongoDbTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbConversionsTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbExceptionHandlingTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbFindOperationTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumerTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbWriteConcernsTest.java (with props)
camel/trunk/components/camel-mongodb/src/test/resources/
camel/trunk/components/camel-mongodb/src/test/resources/log4j.properties (with props)
camel/trunk/components/camel-mongodb/src/test/resources/mongodb.test.properties (with props)
camel/trunk/components/camel-mongodb/src/test/resources/org/
camel/trunk/components/camel-mongodb/src/test/resources/org/apache/
camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/
camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/
camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/
camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/mongoComponentTest.xml (with props)
Modified:
camel/trunk/components/pom.xml
camel/trunk/parent/pom.xml
Propchange: camel/trunk/components/camel-mongodb/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Feb 16 14:09:55 2012
@@ -0,0 +1,17 @@
+.pmd
+.checkstyle
+.ruleset
+target
+.settings
+.classpath
+.project
+.wtpmodules
+prj.el
+.jdee_classpath
+.jdee_sources
+velocity.log
+eclipse-classes
+*.ipr
+*.iml
+*.iws
+*.idea
Added: camel/trunk/components/camel-mongodb/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/pom.xml?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/pom.xml (added)
+++ camel/trunk/components/camel-mongodb/pom.xml Thu Feb 16 14:09:55 2012
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>components</artifactId>
+ <groupId>org.apache.camel</groupId>
+ <version>2.10-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-mongodb</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: MongoDB</name>
+ <description>Camel MongoDB component</description>
+
+ <properties>
+ <camel.osgi.export.pkg>org.apache.camel.component.mongodb.*</camel.osgi.export.pkg>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+
+ <!-- MongoDB driver dependency -->
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>${mongo-java-driver-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson-version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: camel/trunk/components/camel-mongodb/pom.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public class CamelMongoDbException extends Exception {
+
+ private static final long serialVersionUID = 7834484945432331909L;
+
+ public CamelMongoDbException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CamelMongoDbException(String message) {
+ super(message);
+ }
+
+ public CamelMongoDbException(Throwable cause) {
+ super(cause);
+ }
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/CamelMongoDbException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.Map;
+
+import com.mongodb.Mongo;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.CamelContextHelper;
+
+/**
+ * Represents the component that manages {@link MongoDbEndpoint}.
+ */
+public class MongoDbComponent extends DefaultComponent {
+
+ /**
+ * Should access a singleton of type Mongo
+ */
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ Mongo db = CamelContextHelper.mandatoryLookup(getCamelContext(), remaining, Mongo.class);
+
+ Endpoint endpoint = new MongoDbEndpoint(uri, this);
+ parameters.put("mongoConnection", db);
+ setProperties(endpoint, parameters);
+
+ return endpoint;
+ }
+
+ public static CamelMongoDbException wrapInCamelMongoDbException(Throwable t) {
+ if (t instanceof CamelMongoDbException) {
+ return (CamelMongoDbException) t;
+ } else {
+ return new CamelMongoDbException(t);
+ }
+ }
+
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public final class MongoDbConstants {
+
+ public static final String OPERATION_HEADER = "CamelMongoDbOperation";
+ public static final String RESULT_TOTAL_SIZE = "CamelMongoDbResultTotalSize";
+ public static final String RESULT_PAGE_SIZE = "CamelMongoDbResultPageSize";
+ public static final String FIELDS_FILTER = "CamelMongoDbFieldsFilter";
+ public static final String BATCH_SIZE = "CamelMongoDbBatchSize";
+ public static final String NUM_TO_SKIP = "CamelMongoDbNumToSkip";
+ public static final String INSERT_RECORDS_AFFECTED = "CamelMongoDbInsertRecordsAffected";
+ public static final String LAST_ERROR = "CamelMongoDbLastError";
+ public static final String MULTIUPDATE = "CamelMongoDbMultiUpdate";
+ public static final String UPSERT = "CamelMongoDbUpsert";
+ public static final String RECORDS_AFFECTED = "CamelMongoDbRecordsAffected";
+ public static final String SORT_BY = "CamelMongoDbSortBy";
+ public static final String DATABASE = "CamelMongoDbDatabase";
+ public static final String COLLECTION = "CamelMongoDbCollection";
+ public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
+ public static final String LIMIT = "CamelMongoDbLimit";
+ public static final String FROM_TAILABLE = "CamelMongoDbTailable";
+
+ private MongoDbConstants() { }
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public enum MongoDbConsumerType {
+
+ tailable
+ // more consumer types to be included in future versions
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConsumerType.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,503 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.ReadPreference;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a MongoDb endpoint.
+ * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances.
+ * It accepts a number of options to customise the behaviour of consumers and producers.
+ */
+public class MongoDbEndpoint extends DefaultEndpoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbEndpoint.class);
+ private Mongo mongoConnection;
+ private String database;
+ private String collection;
+ private MongoDbOperation operation;
+ private boolean createCollection = true;
+ private boolean invokeGetLastError; // = false
+ private WriteConcern writeConcern;
+ private WriteConcern writeConcernRef;
+ private ReadPreference readPreference;
+ private boolean dynamicity; // = false
+ // tailable cursor consumer by default
+ private MongoDbConsumerType consumerType;
+ private long cursorRegenerationDelay = 1000L;
+ private String tailTrackIncreasingField;
+
+ // persitent tail tracking
+ private boolean persistentTailTracking; // = false;
+ private String persistentId;
+ private String tailTrackDb;
+ private String tailTrackCollection;
+ private String tailTrackField;
+
+ private MongoDbTailTrackingConfig tailTrackingConfig;
+
+ private DBCollection dbCollection;
+ private DB db;
+
+ // ======= Constructors ===============================================
+
+ public MongoDbEndpoint() {
+ }
+
+ public MongoDbEndpoint(String uri, MongoDbComponent component) {
+ super(uri, component);
+ }
+
+ @SuppressWarnings("deprecation")
+ public MongoDbEndpoint(String endpointUri) {
+ super(endpointUri);
+ }
+
+ // ======= Implementation methods =====================================
+
+ public Producer createProducer() throws Exception {
+ validateOptions('P');
+ initializeConnection();
+ return new MongoDbProducer(this);
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ validateOptions('C');
+ // we never create the collection
+ createCollection = false;
+ initializeConnection();
+
+ // select right consumer type
+ if (consumerType == null) {
+ consumerType = MongoDbConsumerType.tailable;
+ }
+
+ Consumer consumer = null;
+ if (consumerType == MongoDbConsumerType.tailable) {
+ consumer = new MongoDbTailableCursorConsumer(this, processor);
+ } else {
+ throw new CamelMongoDbException("Consumer type not supported: " + consumerType);
+ }
+
+ return consumer;
+ }
+
+ private void validateOptions(char role) throws IllegalArgumentException {
+ // make our best effort to validate, options with defaults are checked against their defaults, which is not always a guarantee that
+ // they haven't been explicitly set, but it is enough
+ if (role == 'P') {
+ if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb)
+ || !ObjectHelper.isEmpty(tailTrackCollection) || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) {
+ throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint");
+ }
+ } else if (role == 'C') {
+ if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null
+ || readPreference != null || dynamicity || invokeGetLastError) {
+ throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError "
+ + "options cannot appear on a consumer endpoint");
+ }
+
+ if (consumerType == MongoDbConsumerType.tailable) {
+ if (tailTrackIncreasingField == null) {
+ throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint");
+ }
+ if (persistentTailTracking && (ObjectHelper.isEmpty(persistentId))) {
+ throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking");
+ }
+ }
+
+ } else {
+ throw new IllegalArgumentException("Unknown endpoint role");
+ }
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ /**
+ * Initialises the MongoDB connection using the Mongo object provided to the endpoint
+ * @throws CamelMongoDbException
+ */
+ public void initializeConnection() throws CamelMongoDbException {
+ LOG.info("Initialising MongoDb endpoint: {}", this.toString());
+ if (database == null || collection == null) {
+ throw new CamelMongoDbException("Missing required endpoint configuration: database and/or collection");
+ }
+ db = mongoConnection.getDB(database);
+ if (db == null) {
+ throw new CamelMongoDbException("Could not initialise MongoDbComponent. Database " + database + " does not exist.");
+ }
+ if (!createCollection && !db.collectionExists(collection)) {
+ throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false.");
+ }
+ dbCollection = db.getCollection(collection);
+
+ LOG.info("MongoDb component initialised and endpoint bound to MongoDB collection with the following paramters. Address list: {}, Db: {}, Collection: {}",
+ new Object[] {mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()});
+ }
+
+ /**
+ * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization
+ */
+ @Override
+ protected void doStart() throws Exception {
+ if (writeConcern != null && writeConcernRef != null) {
+ LOG.error("Cannot set both writeConcern and writeConcernRef at the same time. Respective values: {}, {}. "
+ + "Aborting initialization.", new Object[] {writeConcern, writeConcernRef});
+ throw new IllegalArgumentException("Cannot set both writeConcern and writeConcernRef at the same time on MongoDB endpoint");
+ }
+
+ setWriteReadOptionsOnConnection();
+ super.doStart();
+ }
+
+ public Exchange createMongoDbExchange(DBObject dbObj) {
+ Exchange exchange = new DefaultExchange(this.getCamelContext(), getExchangePattern());
+ Message message = new DefaultMessage();
+ message.setHeader(MongoDbConstants.DATABASE, database);
+ message.setHeader(MongoDbConstants.COLLECTION, collection);
+ message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+
+ message.setBody(dbObj);
+ exchange.setIn(message);
+ return exchange;
+ }
+
+ private void setWriteReadOptionsOnConnection() {
+ // Set the WriteConcern
+ if (writeConcern != null) {
+ mongoConnection.setWriteConcern(writeConcern);
+ } else if (writeConcernRef != null) {
+ mongoConnection.setWriteConcern(writeConcernRef);
+ }
+
+ // Set the ReadPreference
+ if (readPreference != null) {
+ mongoConnection.setReadPreference(readPreference);
+ }
+ }
+
+
+ // ======= Getters and setters ===============================================
+
+ /**
+ * Sets the name of the MongoDB collection to bind to this endpoint
+ * @param collection collection name
+ */
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+
+ public String getCollection() {
+ return collection;
+ }
+
+ /**
+ * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}.
+ * @param operation name of the operation as per catalogued values
+ * @throws CamelMongoDbException
+ */
+ public void setOperation(String operation) throws CamelMongoDbException {
+ try {
+ this.operation = MongoDbOperation.valueOf(operation);
+ } catch (IllegalArgumentException e) {
+ throw new CamelMongoDbException("Operation not supported", e);
+ }
+ }
+
+ public MongoDbOperation getOperation() {
+ return operation;
+ }
+
+ /**
+ * Sets the name of the MongoDB database to target
+ * @param database name of the MongoDB database
+ */
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ /**
+ * Create collection during initialisation if it doesn't exist. Default is true.
+ * @param createCollection true or false
+ */
+ public void setCreateCollection(boolean createCollection) {
+ this.createCollection = createCollection;
+ }
+
+ public boolean isCreateCollection() {
+ return createCollection;
+ }
+
+ public DB getDb() {
+ return db;
+ }
+
+ public DBCollection getDbCollection() {
+ return dbCollection;
+ }
+
+ /**
+ * Sets the Mongo instance that represents the backing connection
+ * @param mongoConnection the connection to the database
+ */
+ public void setMongoConnection(Mongo mongoConnection) {
+ this.mongoConnection = mongoConnection;
+ }
+
+ public Mongo getMongoConnection() {
+ return mongoConnection;
+ }
+
+ /**
+ * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
+ * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method.
+ * @param writeConcern the standard name of the WriteConcern
+ * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a>
+ */
+ public void setWriteConcern(String writeConcern) {
+ this.writeConcern = WriteConcern.valueOf(writeConcern);
+ }
+
+ public WriteConcern getWriteConcern() {
+ return writeConcern;
+ }
+
+ /**
+ * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait
+ * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation
+ * has actually occurred in MongoDB.
+ * @param invokeGetLastError true or false
+ */
+ public void setInvokeGetLastError(boolean invokeGetLastError) {
+ this.invokeGetLastError = invokeGetLastError;
+ }
+
+ public boolean isInvokeGetLastError() {
+ return invokeGetLastError;
+ }
+
+ /**
+ * Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry.
+ * You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method.
+ * @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use
+ */
+ public void setWriteConcernRef(String writeConcernRef) {
+ WriteConcern wc = this.getCamelContext().getRegistry().lookup(writeConcernRef, WriteConcern.class);
+ if (wc == null) {
+ LOG.error("Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the "
+ + "provided bean name ({}) is correct. Aborting initialization.", writeConcernRef);
+ throw new IllegalArgumentException("Camel MongoDB component could not find the WriteConcern in the Registry");
+ }
+
+ this.writeConcernRef = wc;
+ }
+
+ public WriteConcern getWriteConcernRef() {
+ return writeConcernRef;
+ }
+
+ /**
+ * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be
+ * overridden by this setting.
+ * @param readPreference the bean name of the read preference to set
+ */
+ public void setReadPreference(String readPreference) {
+ Class<?>[] innerClasses = ReadPreference.class.getDeclaredClasses();
+ for (Class<?> inClass : innerClasses) {
+ if (inClass.getSuperclass() == ReadPreference.class && inClass.getName().equals(readPreference)) {
+ try {
+ this.readPreference = (ReadPreference) inClass.getConstructor((Class<?>) null).newInstance((Object[]) null);
+ } catch (Exception e) {
+ continue;
+ }
+ break;
+ }
+ }
+
+ LOG.error("Could not resolve specified ReadPreference of type {}. Read preferences are resolved from inner "
+ + "classes of com.mongodb.ReadPreference.", readPreference);
+ throw new IllegalArgumentException("MongoDB endpoint could not resolve specified ReadPreference");
+ }
+
+ public ReadPreference getReadPreference() {
+ return readPreference;
+ }
+
+ /**
+ * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties.
+ * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI.
+ * It is disabled by default to boost performance. Enabling it will take a minimal performance hit.
+ * @see MongoDbConstants#DATABASE
+ * @see MongoDbConstants#COLLECTION
+ * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties.
+ */
+ public void setDynamicity(boolean dynamicity) {
+ this.dynamicity = dynamicity;
+ }
+
+ public boolean isDynamicity() {
+ return dynamicity;
+ }
+
+ /**
+ * Reserved for future use, when more consumer types are supported.
+ * @param consumerType key of the consumer type
+ * @throws CamelMongoDbException
+ */
+ public void setConsumerType(String consumerType) throws CamelMongoDbException {
+ try {
+ this.consumerType = MongoDbConsumerType.valueOf(consumerType);
+ } catch (IllegalArgumentException e) {
+ throw new CamelMongoDbException("Consumer type not supported", e);
+ }
+ }
+
+ public MongoDbConsumerType getConsumerType() {
+ return consumerType;
+ }
+
+ public String getTailTrackDb() {
+ return tailTrackDb;
+ }
+
+ /**
+ * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will
+ * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database
+ * will not vary past endpoint initialisation.
+ * @param tailTrackDb database name
+ */
+ public void setTailTrackDb(String tailTrackDb) {
+ this.tailTrackDb = tailTrackDb;
+ }
+
+ public String getTailTrackCollection() {
+ return tailTrackCollection;
+ }
+
+ /**
+ * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION}
+ * will be used by default.
+ * @param tailTrackCollection collection name
+ */
+ public void setTailTrackCollection(String tailTrackCollection) {
+ this.tailTrackCollection = tailTrackCollection;
+ }
+
+ public String getTailTrackField() {
+ return tailTrackField;
+ }
+
+ /**
+ * Field where the last tracked value will be placed. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_FIELD}
+ * will be used by default.
+ * @param tailTrackField field name
+ */
+ public void setTailTrackField(String tailTrackField) {
+ this.tailTrackField = tailTrackField;
+ }
+
+ /**
+ * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts.
+ * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records.
+ * @param persistentTailTracking true or false
+ */
+ public void setPersistentTailTracking(boolean persistentTailTracking) {
+ this.persistentTailTracking = persistentTailTracking;
+ }
+
+ public boolean isPersistentTailTracking() {
+ return persistentTailTracking;
+ }
+
+ /**
+ * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every
+ * time it is generated.
+ * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent
+ * tail tracking).
+ * Can be of type Integer, Date, String, etc.
+ * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document.
+ * @param tailTrackIncreasingField
+ */
+ public void setTailTrackIncreasingField(String tailTrackIncreasingField) {
+ this.tailTrackIncreasingField = tailTrackIncreasingField;
+ }
+
+ public String getTailTrackIncreasingField() {
+ return tailTrackIncreasingField;
+ }
+
+ public MongoDbTailTrackingConfig getTailTrackingConfig() {
+ if (tailTrackingConfig == null) {
+ tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField,
+ tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, tailTrackField, getPersistentId());
+ }
+ return tailTrackingConfig;
+ }
+
+ /**
+ * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically
+ * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait
+ * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms.
+ * @param cursorRegenerationDelay delay specified in milliseconds
+ */
+ public void setCursorRegenerationDelay(long cursorRegenerationDelay) {
+ this.cursorRegenerationDelay = cursorRegenerationDelay;
+ }
+
+ public long getCursorRegenerationDelay() {
+ return cursorRegenerationDelay;
+ }
+
+ /**
+ * One tail tracking collection can host many trackers for several tailable consumers.
+ * To keep them separate, each tracker should have its own unique persistentId.
+ * @param persistentId the value of the persistent ID to use for this tailable consumer
+ */
+ public void setPersistentId(String persistentId) {
+ this.persistentId = persistentId;
+ }
+
+ public String getPersistentId() {
+ return persistentId;
+ }
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public enum MongoDbOperation {
+
+ // read operations
+ findById,
+ findOneByQuery,
+ findAll,
+ // group, // future
+ // mapReduce, // future
+
+ // create/update operations
+ insert,
+ save,
+ update,
+
+ // delete operations
+ remove,
+
+ // others
+ getDbStats,
+ getColStats,
+ count,
+
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.CommandResult;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MongoDb producer.
+ */
+public class MongoDbProducer extends DefaultProducer {
+ private static final transient Logger LOG = LoggerFactory.getLogger(MongoDbProducer.class);
+ private MongoDbEndpoint endpoint;
+
+ public MongoDbProducer(MongoDbEndpoint endpoint) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ MongoDbOperation operation = endpoint.getOperation();
+ Object header = exchange.getIn().getHeader(MongoDbConstants.OPERATION_HEADER);
+ if (header != null) {
+ LOG.debug("Overriding default operation with operation specified on header: {}", header);
+ try {
+ if (header instanceof MongoDbOperation) {
+ operation = ObjectHelper.cast(MongoDbOperation.class, header);
+ } else {
+ // evaluate as a String
+ operation = MongoDbOperation.valueOf(exchange.getIn().getHeader(MongoDbConstants.OPERATION_HEADER, String.class));
+ }
+ } catch (Exception e) {
+ LOG.error("Operation not supported: {}", header);
+ exchange.setException(new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e));
+ return;
+ }
+ }
+
+ try {
+ invokeOperation(operation, exchange);
+ } catch (Exception e) {
+ CamelMongoDbException partEx = MongoDbComponent.wrapInCamelMongoDbException(e);
+ LOG.error("Breaking MongoDB operation due to exception", partEx);
+ exchange.setException(partEx);
+ }
+
+ }
+
+ /**
+ * Entry method that selects the appropriate MongoDB operation and executes it
+ * @param operation
+ * @param exchange
+ * @throws Exception
+ */
+ protected void invokeOperation(MongoDbOperation operation, Exchange exchange) throws Exception {
+ switch (operation) {
+ case count:
+ doCount(exchange);
+ break;
+
+ case findOneByQuery:
+ doFindOneByQuery(exchange);
+ break;
+
+ case findById:
+ doFindById(exchange);
+ break;
+
+ case findAll:
+ doFindAll(exchange);
+ break;
+
+ case insert:
+ doInsert(exchange);
+ break;
+
+ case save:
+ doSave(exchange);
+ break;
+
+ case update:
+ doUpdate(exchange);
+ break;
+
+ case remove:
+ doRemove(exchange);
+ break;
+
+ case getDbStats:
+ doGetStats(exchange, 'D');
+ break;
+
+ case getColStats:
+ doGetStats(exchange, 'C');
+ break;
+
+ default:
+ LOG.error("Unexpected operation found: {}", operation);
+ exchange.setException(new CamelMongoDbException("Operation not supported. Value: " + operation));
+ break;
+ }
+ }
+
+ // ----------- MongoDB operations ----------------
+
+ protected void doGetStats(Exchange exchange, char c) {
+ DBObject result = null;
+
+ if (c == 'C') {
+ result = calculateCollection(exchange).getStats();
+ } else if (c == 'D') {
+ // if it's a DB, also take into account the dynamicity option and the DB that is used
+ result = calculateCollection(exchange).getDB().getStats();
+ }
+
+ exchange.getOut().setBody(result);
+ }
+
+ protected void doRemove(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ DBObject removeObj = exchange.getIn().getMandatoryBody(DBObject.class);
+
+ WriteConcern wc = extractWriteConcern(exchange);
+ WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc);
+ processWriteResult(result, exchange);
+
+ Message out = exchange.getOut();
+ // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // obtain the cached CommandResult
+ out.setBody(result);
+ out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void doUpdate(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>) (Class<?>) List.class);
+ if (saveObj.size() != 2) {
+ throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2");
+ }
+
+ DBObject updateCriteria = saveObj.get(0);
+ DBObject objNew = saveObj.get(1);
+
+ Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class);
+ Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class);
+
+ WriteResult result;
+ WriteConcern wc = extractWriteConcern(exchange);
+ // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the
+ // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future,
+ // so it's safer to be explicit at this level for full determinism
+ if (multi == null && upsert == null) {
+ // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args,
+ // so assume defaults
+ result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc);
+ } else {
+ // we calculate the final boolean values so that if any of these parameters is null, it is resolved to false
+ result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi))
+ : dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
+ }
+
+ processWriteResult(result, exchange);
+ Message out = exchange.getOut();
+ // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // obtain the cached CommandResult
+ out.setBody(result);
+ out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+ }
+
+ protected void doSave(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ DBObject saveObj = exchange.getIn().getMandatoryBody(DBObject.class);
+
+ WriteConcern wc = extractWriteConcern(exchange);
+ WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc);
+ processWriteResult(result, exchange);
+ // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // obtain the cached CommandResult
+ exchange.getOut().setBody(result);
+ }
+
+ protected void doFindById(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ Object o = exchange.getIn().getMandatoryBody();
+ DBObject ret;
+
+ DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+ if (fieldFilter == null) {
+ ret = dbCol.findOne(o);
+ } else {
+ ret = dbCol.findOne(o, fieldFilter);
+ }
+
+ Message out = exchange.getOut();
+ out.setBody(ret);
+ out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected void doInsert(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ boolean singleInsert = true;
+ Object insert = exchange.getIn().getBody(DBObject.class);
+ // body could not be converted to DBObject, check to see if it's of type List<DBObject>
+ if (insert == null) {
+ insert = exchange.getIn().getBody(List.class);
+ // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject>
+ if (insert != null) {
+ singleInsert = false;
+ insert = attemptConvertToList((List) insert, exchange);
+ } else {
+ throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type DBObject nor List<DBObject>");
+ }
+ }
+
+ WriteResult result;
+ WriteConcern wc = extractWriteConcern(exchange);
+ if (singleInsert) {
+ result = wc == null ? dbCol.insert((DBObject) insert) : dbCol.insert((DBObject) insert, wc);
+ } else {
+ result = wc == null ? dbCol.insert((List<DBObject>) insert) : dbCol.insert((List<DBObject>) insert, wc);
+ }
+
+ processWriteResult(result, exchange);
+
+ // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or
+ // obtain the cached CommandResult
+ exchange.getOut().setBody(result);
+ }
+
+ protected void doFindAll(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection
+ DBObject query = null;
+ // do not run around looking for a type converter unless there is a need for it
+ if (exchange.getIn().getBody() != null) {
+ query = exchange.getIn().getBody(DBObject.class);
+ }
+ DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+
+ // get the batch size and number to skip
+ Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class);
+ Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class);
+ Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class);
+ DBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, DBObject.class);
+ DBCursor ret = null;
+ try {
+ if (query == null && fieldFilter == null) {
+ ret = dbCol.find(new BasicDBObject());
+ } else if (fieldFilter == null) {
+ ret = dbCol.find(query);
+ } else {
+ ret = dbCol.find(query, fieldFilter);
+ }
+
+ if (sortBy != null) {
+ ret.sort(sortBy);
+ }
+
+ if (batchSize != null) {
+ ret.batchSize(batchSize.intValue());
+ }
+
+ if (numToSkip != null) {
+ ret.skip(numToSkip.intValue());
+ }
+
+ if (limit != null) {
+ ret.limit(limit.intValue());
+ }
+
+ Message out = exchange.getOut();
+ out.setBody(ret.toArray());
+ out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
+ out.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
+
+ } catch (Exception e) {
+ // rethrow the exception
+ throw e;
+ } finally {
+ // make sure the cursor is closed
+ if (ret != null) {
+ ret.close();
+ }
+ }
+
+ }
+
+ protected void doFindOneByQuery(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ DBObject o = exchange.getIn().getMandatoryBody(DBObject.class);
+ DBObject ret;
+
+ DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class);
+ if (fieldFilter == null) {
+ ret = dbCol.findOne(o);
+ } else {
+ ret = dbCol.findOne(o, fieldFilter);
+ }
+
+ Message out = exchange.getOut();
+ out.setBody(ret);
+ out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+ }
+
+ protected void doCount(Exchange exchange) throws Exception {
+ DBCollection dbCol = calculateCollection(exchange);
+ Long answer = Long.valueOf(dbCol.count());
+ exchange.getOut().setBody(answer);
+ }
+
+ // --------- Convenience methods -----------------------
+
+ private DBCollection calculateCollection(Exchange exchange) {
+ // dynamic calculation is an option. In most cases it won't be used and we should not penalise all users with running this
+ // resolution logic on every Exchange if they won't be using this functionality at all
+ if (!endpoint.isDynamicity()) {
+ return endpoint.getDbCollection();
+ }
+
+ String dynamicDB = exchange.getIn().getHeader(MongoDbConstants.DATABASE, String.class);
+ String dynamicCollection = exchange.getIn().getHeader(MongoDbConstants.COLLECTION, String.class);
+
+ if (dynamicDB == null && dynamicCollection == null) {
+ return endpoint.getDbCollection();
+ }
+
+ DB db = endpoint.getDb();
+ DBCollection dbCol = null;
+
+ if (dynamicDB != null) {
+ db = endpoint.getMongoConnection().getDB(dynamicDB);
+ }
+
+ dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection);
+ LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName());
+ return dbCol;
+ }
+
+ private boolean calculateBooleanValue(Boolean b) {
+ return b == null ? false : b.booleanValue();
+ }
+
+ private void processWriteResult(WriteResult result, Exchange exchange) {
+ // if invokeGetLastError is set, or a WriteConcern is set which implicitly calls getLastError, then we have the chance to populate
+ // the MONGODB_LAST_ERROR header, as well as setting an exception on the Exchange if one occurred at the MongoDB server
+ if (endpoint.isInvokeGetLastError() || (endpoint.getWriteConcern() != null ? endpoint.getWriteConcern().callGetLastError() : false)) {
+ CommandResult cr = result.getCachedLastError() == null ? result.getLastError() : result.getCachedLastError();
+ exchange.getOut().setHeader(MongoDbConstants.LAST_ERROR, cr);
+ if (!cr.ok()) {
+ exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
+ }
+ }
+ }
+
+ private WriteConcern extractWriteConcern(Exchange exchange) throws CamelMongoDbException {
+ Object o = exchange.getIn().getHeader(MongoDbConstants.WRITECONCERN);
+
+ if (o == null) {
+ return null;
+ } else if (o instanceof WriteConcern) {
+ return ObjectHelper.cast(WriteConcern.class, o);
+ } else if (o instanceof String) {
+ WriteConcern answer = WriteConcern.valueOf(ObjectHelper.cast(String.class, o));
+ if (answer == null) {
+ throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN
+ + " header, with value " + o + " could not be resolved to a WriteConcern type");
+ }
+ }
+
+ // should never get here
+ LOG.warn("A problem occurred while resolving the Exchange's Write Concern");
+ return null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private List<DBObject> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException {
+ List<DBObject> dbObjectList = new ArrayList<DBObject>(insertList.size());
+ TypeConverter converter = exchange.getContext().getTypeConverter();
+ for (Object item : insertList) {
+ try {
+ DBObject dbObject = converter.mandatoryConvertTo(DBObject.class, item);
+ dbObjectList.add(dbObject);
+ } catch (Exception e) {
+ throw new CamelMongoDbException("MongoDB operation = insert, Assuming List variant of MongoDB insert operation, but List contains non-DBObject items", e);
+ }
+ }
+ return dbObjectList;
+ }
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+public class MongoDbTailTrackingConfig {
+
+ public static final String DEFAULT_COLLECTION = "camelTailTracking";
+ public static final String DEFAULT_FIELD = "lastTrackingValue";
+
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+ */
+ public final String increasingField;
+ /**
+ * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+ */
+ public final boolean persistent;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackDb(String)}
+ */
+ public final String db;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackCollection(String)}
+ */
+ public final String collection;
+ /**
+ * See {@link MongoDbEndpoint#setTailTrackField(String)}
+ */
+ public final String field;
+ /**
+ * See {@link MongoDbEndpoint#setPersistentId(String)}
+ */
+ public final String persistentId;
+
+ public MongoDbTailTrackingConfig(boolean persistentTailTracking, String tailTrackIncreasingField, String tailTrackDb,
+ String tailTrackCollection, String tailTrackField, String persistentId) {
+ this.increasingField = tailTrackIncreasingField;
+ this.persistent = persistentTailTracking;
+ this.db = tailTrackDb;
+ this.persistentId = persistentId;
+ this.collection = tailTrackCollection == null ? MongoDbTailTrackingConfig.DEFAULT_COLLECTION : tailTrackCollection;
+ this.field = tailTrackField == null ? MongoDbTailTrackingConfig.DEFAULT_FIELD : tailTrackField;
+ }
+}
\ No newline at end of file
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.Mongo;
+import com.mongodb.WriteConcern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoDbTailTrackingManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
+
+ public Object lastVal;
+
+ private final Mongo connection;
+ private final MongoDbTailTrackingConfig config;
+ private DBCollection dbCol;
+ private DBObject trackingObj;
+
+ public MongoDbTailTrackingManager(Mongo connection, MongoDbTailTrackingConfig config) {
+ this.connection = connection;
+ this.config = config;
+ }
+
+ public void initialize() throws Exception {
+ if (!config.persistent) {
+ return;
+ }
+
+ dbCol = connection.getDB(config.db).getCollection(config.collection);
+ DBObject filter = new BasicDBObject("persistentId", config.persistentId);
+ trackingObj = dbCol.findOne(filter);
+ if (trackingObj == null) {
+ dbCol.insert(filter, WriteConcern.SAFE);
+ trackingObj = dbCol.findOne();
+ }
+ // keep only the _id, the rest is useless and causes more overhead during update
+ trackingObj = new BasicDBObject("_id", trackingObj.get("_id"));
+ }
+
+ public synchronized void persistToStore() {
+ if (!config.persistent || lastVal == null) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Persisting lastVal={} to store, collection: {}", lastVal, config.collection);
+ }
+
+ DBObject updateObj = BasicDBObjectBuilder.start().add("$set", new BasicDBObject(config.field, lastVal)).get();
+ dbCol.update(trackingObj, updateObj, false, false, WriteConcern.SAFE);
+ trackingObj = dbCol.findOne();
+ }
+
+ public synchronized Object recoverFromStore() {
+ if (!config.persistent) {
+ return null;
+ }
+
+ lastVal = dbCol.findOne(trackingObj).get(config.field);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered lastVal={} from store, collection: {}", lastVal, config.collection);
+ }
+
+ return lastVal;
+ }
+
+ public void setLastVal(DBObject o) {
+ if (config.increasingField == null) {
+ return;
+ }
+
+ lastVal = o.get(config.increasingField);
+ }
+
+ public String getIncreasingFieldName() {
+ return config.increasingField;
+ }
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * The MongoDb consumer.
+ */
+public class MongoDbTailableCursorConsumer extends DefaultConsumer {
+ private final MongoDbEndpoint endpoint;
+ private ExecutorService executor;
+ private MongoDbTailingProcess tailingProcess;
+
+ public MongoDbTailableCursorConsumer(MongoDbEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (tailingProcess != null) {
+ tailingProcess.stop();
+ }
+ if (executor != null) {
+ executor.shutdown();
+ executor = null;
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1);
+ MongoDbTailTrackingManager trackingManager = initTailTracking();
+ tailingProcess = new MongoDbTailingProcess(endpoint, this, trackingManager);
+ tailingProcess.initializeProcess();
+ executor.execute(tailingProcess);
+ }
+
+ protected MongoDbTailTrackingManager initTailTracking() throws Exception {
+ MongoDbTailTrackingManager answer = new MongoDbTailTrackingManager(endpoint.getMongoConnection(), endpoint.getTailTrackingConfig());
+ answer.initialize();
+ return answer;
+ }
+
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailableCursorConsumer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java?rev=1244980&view=auto
==============================================================================
--- camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java (added)
+++ camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java Thu Feb 16 14:09:55 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.Bytes;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException.CursorNotFound;
+
+import org.apache.camel.Exchange;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoDbTailingProcess implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
+ private static final String CAPPED_KEY = "capped";
+
+ public volatile boolean keepRunning = true;
+ public volatile boolean stopped; // = false
+
+ private final DBCollection dbCol;
+ private final MongoDbEndpoint endpoint;
+ private final MongoDbTailableCursorConsumer consumer;
+
+ // create local, final copies of these variables for increased performance
+ private final long cursorRegenerationDelay;
+ private final boolean cursorRegenerationDelayEnabled;
+
+ private DBCursor cursor;
+ private MongoDbTailTrackingManager tailTracking;
+
+
+ public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
+ this.endpoint = endpoint;
+ this.consumer = consumer;
+ this.dbCol = endpoint.getDbCollection();
+ this.tailTracking = tailTrack;
+ this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
+ this.cursorRegenerationDelayEnabled = !(this.cursorRegenerationDelay == 0);
+ }
+
+ public DBCursor getCursor() {
+ return cursor;
+ }
+
+ /**
+ * Initialise the tailing process, the cursor and if persistent tail tracking is enabled, recover the cursor from the persisted point.
+ * As part of the initialisation process, the component will validate that the collection we are targeting is 'capped'.
+ * @throws Exception
+ */
+ public void initializeProcess() throws Exception {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ }
+
+ if (dbCol.getStats().getInt(CAPPED_KEY) != 1) {
+ throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + dbCol.getName()
+ + " is not capped");
+ }
+ try {
+ // recover the last value from the store if it exists
+ tailTracking.recoverFromStore();
+ cursor = initializeCursor();
+ } catch (Exception e) {
+ throw new CamelMongoDbException("Exception ocurred while initializing tailable cursor", e);
+ }
+
+ if (cursor == null) {
+ throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
+ }
+
+ }
+
+ /**
+ * The heart of the tailing process.
+ */
+ @Override
+ public void run() {
+ while (keepRunning) {
+ doRun();
+ // if the previous call didn't return because we have stopped running, then regenerate the cursor
+ if (keepRunning) {
+ cursor.close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", tailTracking.lastVal, cursorRegenerationDelay);
+ }
+
+ if (cursorRegenerationDelayEnabled) {
+ try {
+ Thread.sleep(cursorRegenerationDelay);
+ } catch (InterruptedException e) {
+ LOG.error("Thread was interrupted", e);
+ }
+ }
+
+ cursor = initializeCursor();
+ }
+ }
+
+ stopped = true;
+ }
+
+ protected void stop() throws Exception {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ }
+ keepRunning = false;
+ // close the cursor if it's open, so if it is blocked on hasNext() it will return immediately
+ if (cursor != null) {
+ cursor.close();
+ }
+ // wait until the main loop acknowledges the stop
+ while (!stopped) { }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + dbCol.getDB() + ", col: " + dbCol.getName());
+ }
+ }
+
+ /**
+ * The heart of the tailing process.
+ */
+ private void doRun() {
+ // while the cursor has more values, keepRunning is true and the cursorId is not 0, which symbolizes that the cursor is dead
+ try {
+ while (cursor.hasNext() && cursor.getCursorId() != 0 && keepRunning) {
+ DBObject dbObj = cursor.next();
+ Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sending exchange: {}, ObjectId: {}", exchange, dbObj.get("_id"));
+ }
+ consumer.getProcessor().process(exchange);
+ } catch (Exception e) {
+ LOG.warn("Exception ocurred while processing exchange with ID " + exchange.getExchangeId(), e);
+ if (exchange.getException() != e) {
+ exchange.setException(e);
+ }
+ }
+ tailTracking.setLastVal(dbObj);
+ }
+ } catch (CursorNotFound e) {
+ // we only log the warning if we are not stopping, otherwise it is expected because the stop() method kills the cursor just in case it is blocked
+ // waiting for more data to arrive
+ if (keepRunning) {
+ LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e);
+ }
+ }
+
+ // the loop finished, persist the lastValue just in case we are shutting down
+ // TODO: perhaps add a functionality to persist every N records
+ tailTracking.persistToStore();
+ }
+
+ // no arguments, will ask DB what the last updated Id was (checking persistent storage)
+ private DBCursor initializeCursor() {
+ Object lastVal = tailTracking.lastVal;
+ // lastVal can be null if we are initializing and there is no persistence enabled
+ DBCursor answer;
+ if (lastVal == null) {
+ answer = dbCol.find().addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+ } else {
+ DBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
+ answer = dbCol.find(queryObj).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
+ }
+ return answer;
+ }
+}
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
------------------------------------------------------------------------------
svn:keywords = Rev Date