You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/28 13:15:48 UTC
svn commit: r789087 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/component/file/
main/java/org/apache/camel/component/file/strategy/
main/java/org/apache/camel/impl/ main/java/org/apache/camel/util/
test/java...
Author: davsclaus
Date: Sun Jun 28 11:15:47 2009
New Revision: 789087
URL: http://svn.apache.org/viewvc?rev=789087&view=rev
Log:
CAMEL-1750: Introduced PollingConsumerPollStrategy to control behavior for scheduled consumer allowing you to handle exceptions related to polling, eg network not reachable etc.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (contents, props changed)
- copied, changed from r789058, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
- copied, changed from r789058, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Strategy for a {@link org.apache.camel.PollingConsumer} when polling an {@link org.apache.camel.Endpoint}.
+ * <p/>
+ * This pluggable strategy allows to plugin different implementations what to do, most noticeable what to
+ * do in case the polling goes wrong. This can be handled in the {@link #rollback(Consumer, Endpoint, Exception) rollback}
+ * method.
+ *
+ * @version $Revision$
+ */
+public interface PollingConsumerPollStrategy {
+
+ /**
+ * Called when poll is about to begin
+ *
+ * @param consumer the consumer
+ * @param endpoint the endpoint being consumed
+ */
+ public void begin(Consumer consumer, Endpoint endpoint);
+
+ /**
+ * Called when poll is completed sucesfully
+ *
+ * @param consumer the consumer
+ * @param endpoint the endpoint being consumed
+ */
+ public void commit(Consumer consumer, Endpoint endpoint);
+
+ /**
+ * Called when poll failed
+ *
+ * @param consumer the consumer
+ * @param endpoint the endpoint being consumed
+ * @param cause the caused exception
+ * @throws Exception can be used to rethrow the caused exception
+ */
+ public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception;
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java Sun Jun 28 11:15:47 2009
@@ -22,6 +22,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,7 +53,7 @@
// sort by using file language
String sortBy = getAndRemoveParameter(parameters, "sortBy", String.class);
- if (isNotEmpty(sortBy) && !isReferenceParameter(sortBy)) {
+ if (isNotEmpty(sortBy) && !EndpointHelper.isReferenceParameter(sortBy)) {
// we support nested sort groups so they should be chained
String[] groups = sortBy.split(";");
Iterator<String> it = ObjectHelper.createIterator(groups);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sun Jun 28 11:15:47 2009
@@ -120,8 +120,6 @@
// process the current exchange
processExchange(exchange);
-
-
}
// remove the file from the in progress list in case the batch was limited by max messages per poll
@@ -130,7 +128,6 @@
String key = exchange.getGenericFile().getFileName();
endpoint.getInProgressRepository().remove(key);
}
-
}
/**
@@ -166,7 +163,7 @@
boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile());
if (!begin) {
- log.warn(endpoint + " cannot begin processing file: " + exchange.getGenericFile());
+ log.debug(endpoint + " cannot begin processing file: " + exchange.getGenericFile());
// remove file from the in progress list as its no longer in progress
endpoint.getInProgressRepository().remove(exchange.getGenericFile().getFileName());
return;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Sun Jun 28 11:15:47 2009
@@ -150,10 +150,6 @@
}
}
- public void setGenericFileProcessStrategy(GenericFileProcessStrategy<T> genericFileProcessStrategy) {
- this.processStrategy = genericFileProcessStrategy;
- }
-
public boolean isNoop() {
return noop;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java Sun Jun 28 11:15:47 2009
@@ -17,7 +17,7 @@
package org.apache.camel.component.file;
/**
- * Represents a strategy for marking that a remote file is processed.
+ * Represents a pluggable strategy when processing files.
*/
public interface GenericFileProcessStrategy<T> {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Sun Jun 28 11:15:47 2009
@@ -48,7 +48,7 @@
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
} else if (moveExpression != null || preMoveExpression != null) {
- FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
+ GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
@@ -63,7 +63,7 @@
return strategy;
} else {
// default strategy will move files in a .camel/ subfolder where the file was consumed
- FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
+ GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
// use context to lookup language to let it be loose coupled
Language language = context.resolveLanguage("file");
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java Sun Jun 28 11:15:47 2009
@@ -47,7 +47,7 @@
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
} else if (moveExpression != null || preMoveExpression != null) {
- FileRenameProcessStrategy strategy = new FileRenameProcessStrategy();
+ GenericFileRenameProcessStrategy strategy = new GenericFileRenameProcessStrategy();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
+ * After granting the read lock it is realeased, we just want to make sure that when we start
+ * consuming the file its not currently in progress of being written by third party.
+ */
+public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFileExclusiveReadLockStrategy<T> {
+ private static final transient Log LOG = LogFactory.getLog(GenericFileRenameExclusiveReadLockStrategy.class);
+ private long timeout;
+
+ public boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
+ Exchange exchange) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for exclusive read lock to file: " + file);
+ }
+
+ // the trick is to try to rename the file, if we can rename then we have exclusive read
+ // since its a Generic file we cannot use java.nio to get a RW lock
+ String newName = file.getFileName() + ".camelExclusiveReadLock";
+
+ // clone and change the name
+ GenericFile<T> newFile = file.clone();
+ newFile.changeFileName(newName);
+
+ long start = System.currentTimeMillis();
+
+ boolean exclusive = false;
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = System.currentTimeMillis() - start;
+ if (delta > timeout) {
+ LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout period, so return false
+ return false;
+ }
+ }
+
+ exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
+ if (exclusive) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Acquired exclusive read lock to file: " + file);
+ }
+ // rename it back so we can read it
+ operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
+ } else {
+ boolean interrupted = sleep();
+ if (interrupted) {
+ // we were interrupted while sleeping, we are likely being shutdown so return false
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
+ Exchange exchange) throws Exception {
+ // noop
+ }
+
+ private boolean sleep() {
+ LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis.");
+ try {
+ Thread.sleep(1000);
+ return false;
+ } catch (InterruptedException e) {
+ LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
+ return true;
+ }
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Sets an optional timeout period.
+ * <p/>
+ * If the readlock could not be granted within the timeperiod then the wait is stopped and the
+ * <tt>acquireExclusiveReadLock</tt> returns <tt>false</tt>.
+ *
+ * @param timeout period in millis
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (from r789058, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java&r1=789058&r2=789087&rev=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Sun Jun 28 11:15:47 2009
@@ -24,11 +24,11 @@
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;
-public class FileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
+public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
private GenericFileRenamer<T> beginRenamer;
private GenericFileRenamer<T> commitRenamer;
- public FileRenameProcessStrategy() {
+ public GenericFileRenameProcessStrategy() {
}
@Override
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Sun Jun 28 11:15:47 2009
@@ -31,6 +31,7 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
+import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -209,43 +210,8 @@
*/
protected void setProperties(Object bean, Map parameters) throws Exception {
// set reference properties first as they use # syntax that fools the regular properties setter
- setReferenceProperties(bean, parameters);
- IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), bean, parameters);
- }
-
- /**
- * Sets the reference properties on the given bean
- * <p/>
- * This is convention over configuration, setting all reference parameters (using {@link #isReferenceParameter(String)}
- * by looking it up in registry and setting it on the bean if possible.
- */
- protected void setReferenceProperties(Object bean, Map parameters) throws Exception {
- Iterator it = parameters.keySet().iterator();
- while (it.hasNext()) {
- Object key = it.next();
- String value = (String) parameters.get(key);
- if (isReferenceParameter(value)) {
- Object ref = lookup(value.substring(1));
- String name = key.toString();
- if (ref != null) {
- boolean hit = IntrospectionSupport.setProperty(getCamelContext().getTypeConverter(), bean, name, ref);
- if (hit) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Configued property: " + name + " on bean: " + bean + " with value: " + ref);
- }
- // must remove as its a valid option and we could configure it
- it.remove();
- }
- }
- }
- }
- }
-
- /**
- * Is the given parameter a reference parameter (starting with a # char)
- */
- protected boolean isReferenceParameter(String parameter) {
- return parameter != null && parameter.startsWith("#");
+ EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters);
+ EndpointHelper.setProperties(getCamelContext(), bean, parameters);
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Sun Jun 28 11:15:47 2009
@@ -21,6 +21,8 @@
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A default consumer useful for implementation inheritance.
@@ -28,6 +30,7 @@
* @version $Revision$
*/
public class DefaultConsumer extends ServiceSupport implements Consumer {
+ private final transient Log log = LogFactory.getLog(getClass());
private final Endpoint endpoint;
private final Processor processor;
private ExceptionHandler exceptionHandler;
@@ -62,10 +65,16 @@
}
protected void doStop() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping consumer: " + this);
+ }
ServiceHelper.stopServices(processor);
}
protected void doStart() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting consumer: " + this);
+ }
ServiceHelper.startServices(processor);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Sun Jun 28 11:15:47 2009
@@ -153,6 +153,13 @@
public <T> T getProperty(String name, Class<T> type) {
Object value = getProperty(name);
+
+ // eager same instance type test to avoid the overhead of invoking the type converter
+ // if already same type
+ if (type.isInstance(value)) {
+ return type.cast(value);
+ }
+
return ExchangeHelper.convertToType(this, type, value);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java Sun Jun 28 11:15:47 2009
@@ -44,8 +44,19 @@
public <T> T getHeader(String name, Class<T> type) {
Object value = getHeader(name);
+
+ // eager same instance type test to avoid the overhead of invoking the type converter
+ // if already same type
+ if (type.isInstance(value)) {
+ return type.cast(value);
+ }
+
Exchange e = getExchange();
- return e.getContext().getTypeConverter().convertTo(type, e, value);
+ if (e != null) {
+ return e.getContext().getTypeConverter().convertTo(type, e, value);
+ } else {
+ return (T) value;
+ }
}
public void setHeader(String name, Object value) {
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A default implementation that just logs a <tt>WARN</tt> level log in case of rollback.
+ *
+ * @version $Revision$
+ */
+public class DefaultPollingConsumerPollStrategy implements PollingConsumerPollStrategy {
+
+ private static final transient Log LOG = LogFactory.getLog(DefaultPollingConsumerPollStrategy.class);
+
+ public void begin(Consumer consumer, Endpoint endpoint) {
+ // noop
+ }
+
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ // noop
+ }
+
+ public void rollback(Consumer consumer, Endpoint endpoint, Exception e) throws Exception {
+ LOG.warn("Consumer " + consumer + " could not poll endpoint: " + endpoint.getEndpointUri() + " caused by: " + e.getMessage(), e);
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Sun Jun 28 11:15:47 2009
@@ -20,6 +20,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A default implementation of @{link Producer} for implementation inheritence
@@ -27,6 +29,7 @@
* @version $Revision$
*/
public abstract class DefaultProducer extends ServiceSupport implements Producer {
+ private final transient Log log = LogFactory.getLog(getClass());
private final Endpoint endpoint;
public DefaultProducer(Endpoint endpoint) {
@@ -59,8 +62,14 @@
}
protected void doStart() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting producer: " + this);
+ }
}
protected void doStop() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping producer: " + this);
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -20,7 +20,6 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java Sun Jun 28 11:15:47 2009
@@ -58,7 +58,8 @@
}
protected <T> T getBody(Class<T> type, Object body) {
- // same instance type
+ // eager same instance type test to avoid the overhead of invoking the type converter
+ // if already same type
if (type.isInstance(body)) {
return type.cast(body);
}
@@ -88,6 +89,12 @@
}
public <T> T getMandatoryBody(Class<T> type) throws InvalidPayloadException {
+ // eager same instance type test to avoid the overhead of invoking the type converter
+ // if already same type
+ if (type.isInstance(body)) {
+ return type.cast(body);
+ }
+
Exchange e = getExchange();
if (e != null) {
TypeConverter converter = e.getContext().getTypeConverter();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
import org.apache.camel.Processor;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -44,6 +45,7 @@
private long delay = 500;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
private boolean useFixedDelay;
+ private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -75,10 +77,16 @@
if (LOG.isTraceEnabled()) {
LOG.trace("Starting to poll: " + this.getEndpoint());
}
+ pollStrategy.begin(this, getEndpoint());
poll();
+ pollStrategy.commit(this, getEndpoint());
}
} catch (Exception e) {
- LOG.warn("An exception occurred while polling: " + this.getEndpoint() + ": " + e.getMessage(), e);
+ try {
+ pollStrategy.rollback(this, getEndpoint(), e);
+ } catch (Exception re) {
+ throw ObjectHelper.wrapRuntimeCamelException(re);
+ }
}
if (LOG.isTraceEnabled()) {
@@ -120,6 +128,14 @@
this.useFixedDelay = useFixedDelay;
}
+ public PollingConsumerPollStrategy getPollStrategy() {
+ return pollStrategy;
+ }
+
+ public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
+ this.pollStrategy = pollStrategy;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Sun Jun 28 11:15:47 2009
@@ -23,6 +23,7 @@
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.util.EndpointHelper;
import org.apache.camel.util.IntrospectionSupport;
/**
@@ -58,7 +59,9 @@
protected void configureConsumer(Consumer consumer) throws Exception {
if (consumerProperties != null) {
- IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), consumer, consumerProperties);
+ // set reference properties first as they use # syntax that fools the regular properties setter
+ EndpointHelper.setReferenceProperties(getCamelContext(), consumer, consumerProperties);
+ EndpointHelper.setProperties(getCamelContext(), consumer, consumerProperties);
if (!this.isLenientProperties() && consumerProperties.size() > 0) {
throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + consumerProperties.size()
+ " parameters that couldn't be set on the endpoint consumer."
@@ -84,7 +87,8 @@
Object delay = options.remove("delay");
Object timeUnit = options.remove("timeUnit");
Object useFixedDelay = options.remove("useFixedDelay");
- if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null) {
+ Object pollStrategy = options.remove("pollStrategy");
+ if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
if (consumerProperties == null) {
consumerProperties = new HashMap();
}
@@ -100,6 +104,9 @@
if (useFixedDelay != null) {
consumerProperties.put("useFixedDelay", useFixedDelay);
}
+ if (pollStrategy != null) {
+ consumerProperties.put("pollStrategy", pollStrategy);
+ }
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java Sun Jun 28 11:15:47 2009
@@ -16,8 +16,11 @@
*/
package org.apache.camel.util;
+import java.util.Iterator;
+import java.util.Map;
import java.util.regex.PatternSyntaxException;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
@@ -113,4 +116,59 @@
return false;
}
+ /**
+ * Sets the regular properties on the given bean
+ *
+ * @param context the camel context
+ * @param bean the bean
+ * @param parameters parameters
+ * @throws Exception is thrown if setting property fails
+ */
+ public static void setProperties(CamelContext context, Object bean, Map parameters) throws Exception {
+ IntrospectionSupport.setProperties(context.getTypeConverter(), bean, parameters);
+ }
+
+ /**
+ * Sets the reference properties on the given bean
+ * <p/>
+ * This is convention over configuration, setting all reference parameters (using {@link #isReferenceParameter(String)}
+ * by looking it up in registry and setting it on the bean if possible.
+ *
+ * @param context the camel context
+ * @param bean the bean
+ * @param parameters parameters
+ * @throws Exception is thrown if setting property fails
+ */
+ public static void setReferenceProperties(CamelContext context, Object bean, Map parameters) throws Exception {
+ Iterator it = parameters.keySet().iterator();
+ while (it.hasNext()) {
+ Object key = it.next();
+ String value = (String) parameters.get(key);
+ if (isReferenceParameter(value)) {
+ Object ref = context.getRegistry().lookup(value.substring(1));
+ String name = key.toString();
+ if (ref != null) {
+ boolean hit = IntrospectionSupport.setProperty(context.getTypeConverter(), bean, name, ref);
+ if (hit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Configued property: " + name + " on bean: " + bean + " with value: " + ref);
+ }
+ // must remove as its a valid option and we could configure it
+ it.remove();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Is the given parameter a reference parameter (starting with a # char)
+ *
+ * @param parameter the parameter
+ * @return <tt>true</tt> if its a reference parameter
+ */
+ public static boolean isReferenceParameter(String parameter) {
+ return parameter != null && parameter.startsWith("#");
+ }
+
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumerPollStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Unit test for poll strategy
+ */
+public class FileConsumerPollStrategyStopOnRollbackTest extends ContextTestSupport {
+
+ private static int counter;
+ private static String event = "";
+
+ private String fileUrl = "file://target/pollstrategy/?pollStrategy=#myPoll";
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myPoll", new MyPollStrategy());
+ return jndi;
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/pollstrategy");
+ template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ }
+
+ public void testStopOnRollback() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+
+ // let it run for a little while and since it fails first time we should never get a message
+ Thread.sleep(1000);
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals("rollback", event);
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(fileUrl).to("mock:result");
+ }
+ };
+ }
+
+ private class MyPollStrategy implements PollingConsumerPollStrategy {
+
+ public void begin(Consumer consumer, Endpoint endpoint) {
+ // start consumer as we simualte the fail in begin
+ // and thus before camel lazy start it itself
+ try {
+ consumer.start();
+ } catch (Exception e) {
+ ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (counter++ == 0) {
+ // simulate an error on first poll
+ throw new IllegalArgumentException("Damn I cannot do this");
+ }
+ }
+
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ event += "commit";
+ }
+
+ public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception {
+ if (cause.getMessage().equals("Damn I cannot do this")) {
+ event += "rollback";
+ // stop consumer as it does not work
+ consumer.stop();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (from r789058, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java&r1=789058&r2=789087&rev=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java Sun Jun 28 11:15:47 2009
@@ -16,47 +16,49 @@
*/
package org.apache.camel.component.file;
+import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumerPollStrategy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
/**
- * Unit test that file consumer will skip any files starting with a dot
+ * Unit test for poll strategy
*/
-public class FileConsumerSkipDotFilesTest extends ContextTestSupport {
+public class FileConsumerPollStrategyTest extends ContextTestSupport {
- private String fileUrl = "file://target/dotfiles/";
+ private static int counter;
+ private static String event = "";
+
+ private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll";
@Override
- protected void setUp() throws Exception {
- deleteDirectory("target/dotfiles");
- super.setUp();
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myPoll", new MyPollStrategy());
+ return jndi;
}
- public void testSkipDotFiles() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(0);
-
- template.sendBodyAndHeader("file:target/dotfiles/", "This is a dot file",
- Exchange.FILE_NAME, ".skipme");
-
- mock.setResultWaitTime(2000);
- mock.assertIsSatisfied();
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("target/pollstrategy");
+ template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
}
- public void testSkipDotFilesWithARegularFile() throws Exception {
+ public void testFirstPollRollbackThenCommit() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello World");
- template.sendBodyAndHeader("file:target/dotfiles/", "This is a dot file",
- Exchange.FILE_NAME, ".skipme");
+ assertMockEndpointsSatisfied();
- template.sendBodyAndHeader("file:target/dotfiles/", "Hello World",
- Exchange.FILE_NAME, "hello.txt");
+ // give poll strategy a bit time to signal commit
+ Thread.sleep(50);
- mock.assertIsSatisfied();
+ assertEquals("rollbackcommit", event);
}
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -67,4 +69,24 @@
};
}
-}
+ private class MyPollStrategy implements PollingConsumerPollStrategy {
+
+ public void begin(Consumer consumer, Endpoint endpoint) {
+ if (counter++ == 0) {
+ // simulate an error on first poll
+ throw new IllegalArgumentException("Damn I cannot do this");
+ }
+ }
+
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ event += "commit";
+ }
+
+ public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception {
+ if (cause.getMessage().equals("Damn I cannot do this")) {
+ event += "rollback";
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -44,4 +44,9 @@
public void setExceptionToThrowOnPoll(Exception exceptionToThrowOnPoll) {
this.exceptionToThrowOnPoll = exceptionToThrowOnPoll;
}
+
+ @Override
+ public String toString() {
+ return "MockScheduled";
+ }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java Sun Jun 28 11:15:47 2009
@@ -16,26 +16,52 @@
*/
package org.apache.camel.impl;
+import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
public class ScheduledPollConsumerTest extends ContextTestSupport {
+
+ private static boolean rollback;
public void testExceptionOnPollAndCanStartAgain() throws Exception {
- Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
+
+ final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
+ consumer.setPollStrategy(new PollingConsumerPollStrategy() {
+ public void begin(Consumer consumer, Endpoint endpoint) {
+ }
+
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ }
+
+ public void rollback(Consumer consumer, Endpoint endpoint, Exception e) throws Exception {
+ if (e == expectedException) {
+ rollback = true;
+ }
+
+ }
+ });
+
consumer.start();
// poll that throws an exception
consumer.run();
consumer.stop();
+ assertEquals("Should have rollback", true, rollback);
+
// prepare for 2nd run but this time it should not thrown an exception on poll
+ rollback = false;
consumer.setExceptionToThrowOnPoll(null);
// start it again and we should be able to run
consumer.start();
consumer.run();
// should be able to stop with no problem
consumer.stop();
+
+ assertEquals("Should not have rollback", false, rollback);
}
public void testNoExceptionOnPoll() throws Exception {