You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/28 13:15:48 UTC

svn commit: r789087 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/component/file/ main/java/org/apache/camel/component/file/strategy/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/util/ test/java...

Author: davsclaus
Date: Sun Jun 28 11:15:47 2009
New Revision: 789087

URL: http://svn.apache.org/viewvc?rev=789087&view=rev
Log:
CAMEL-1750: Introduced PollingConsumerPollStrategy to control behavior for scheduled consumer allowing you to handle exceptions related to polling, eg network not reachable etc.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java   (contents, props changed)
      - copied, changed from r789058, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
      - copied, changed from r789058, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Strategy for a {@link org.apache.camel.PollingConsumer} when polling an {@link org.apache.camel.Endpoint}.
+ * <p/>
+ * This pluggable strategy allows to plugin different implementations what to do, most noticeable what to
+ * do in case the polling goes wrong. This can be handled in the {@link #rollback(Consumer, Endpoint, Exception) rollback}
+ * method.
+ *
+ * @version $Revision$
+ */
+public interface PollingConsumerPollStrategy {
+
+    /**
+     * Called when poll is about to begin
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint being consumed
+     */
+    public void begin(Consumer consumer, Endpoint endpoint);
+
+    /**
+     * Called when poll is completed sucesfully
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint being consumed
+     */
+    public void commit(Consumer consumer, Endpoint endpoint);
+
+    /**
+     * Called when poll failed
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint being consumed
+     * @param cause the caused exception
+     * @throws Exception can be used to rethrow the caused exception
+     */
+    public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception;
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileComponent.java Sun Jun 28 11:15:47 2009
@@ -22,6 +22,7 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,7 +53,7 @@
 
         // sort by using file language
         String sortBy = getAndRemoveParameter(parameters, "sortBy", String.class);
-        if (isNotEmpty(sortBy) && !isReferenceParameter(sortBy)) {
+        if (isNotEmpty(sortBy) && !EndpointHelper.isReferenceParameter(sortBy)) {
             // we support nested sort groups so they should be chained
             String[] groups = sortBy.split(";");
             Iterator<String> it = ObjectHelper.createIterator(groups);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sun Jun 28 11:15:47 2009
@@ -120,8 +120,6 @@
 
             // process the current exchange
             processExchange(exchange);
-            
-            
         }
         
         // remove the file from the in progress list in case the batch was limited by max messages per poll
@@ -130,7 +128,6 @@
             String key = exchange.getGenericFile().getFileName();
             endpoint.getInProgressRepository().remove(key);
         }
-        
     }
 
     /**
@@ -166,7 +163,7 @@
 
             boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile());
             if (!begin) {
-                log.warn(endpoint + " cannot begin processing file: " + exchange.getGenericFile());
+                log.debug(endpoint + " cannot begin processing file: " + exchange.getGenericFile());
                 // remove file from the in progress list as its no longer in progress
                 endpoint.getInProgressRepository().remove(exchange.getGenericFile().getFileName());
                 return;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Sun Jun 28 11:15:47 2009
@@ -150,10 +150,6 @@
         }
     }
 
-    public void setGenericFileProcessStrategy(GenericFileProcessStrategy<T> genericFileProcessStrategy) {
-        this.processStrategy = genericFileProcessStrategy;
-    }
-
     public boolean isNoop() {
         return noop;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java Sun Jun 28 11:15:47 2009
@@ -17,7 +17,7 @@
 package org.apache.camel.component.file;
 
 /**
- * Represents a strategy for marking that a remote file is processed.
+ * Represents a pluggable strategy when processing files.
  */
 public interface GenericFileProcessStrategy<T> {
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Sun Jun 28 11:15:47 2009
@@ -48,7 +48,7 @@
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             return strategy;
         } else if (moveExpression != null || preMoveExpression != null) {
-            FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
+            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (moveExpression != null) {
                 GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
@@ -63,7 +63,7 @@
             return strategy;
         } else {
             // default strategy will move files in a .camel/ subfolder where the file was consumed
-            FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
+            GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             // use context to lookup language to let it be loose coupled
             Language language = context.resolveLanguage("file");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java Sun Jun 28 11:15:47 2009
@@ -47,7 +47,7 @@
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             return strategy;
         } else if (moveExpression != null || preMoveExpression != null) {
-            FileRenameProcessStrategy strategy = new FileRenameProcessStrategy();
+            GenericFileRenameProcessStrategy strategy = new GenericFileRenameProcessStrategy();
             strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (moveExpression != null) {
                 GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileOperations;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
+ * After granting the read lock it is realeased, we just want to make sure that when we start
+ * consuming the file its not currently in progress of being written by third party.
+ */
+public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFileExclusiveReadLockStrategy<T> {
+    private static final transient Log LOG = LogFactory.getLog(GenericFileRenameExclusiveReadLockStrategy.class);
+    private long timeout;
+
+    public boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
+                                            Exchange exchange) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for exclusive read lock to file: " + file);
+        }
+
+        // the trick is to try to rename the file, if we can rename then we have exclusive read
+        // since its a Generic file we cannot use java.nio to get a RW lock
+        String newName = file.getFileName() + ".camelExclusiveReadLock";
+
+        // clone and change the name
+        GenericFile<T> newFile = file.clone();
+        newFile.changeFileName(newName);
+
+        long start = System.currentTimeMillis();
+
+        boolean exclusive = false;
+        while (!exclusive) {
+             // timeout check
+            if (timeout > 0) {
+                long delta = System.currentTimeMillis() - start;
+                if (delta > timeout) {
+                    LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+                    // we could not get the lock within the timeout period, so return false
+                    return false;
+                }
+            }
+            
+            exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());           
+            if (exclusive) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Acquired exclusive read lock to file: " + file);
+                }
+                // rename it back so we can read it
+                operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
+            } else {
+                boolean interrupted = sleep();
+                if (interrupted) {
+                    // we were interrupted while sleeping, we are likely being shutdown so return false
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
+                                         Exchange exchange) throws Exception {
+        // noop
+    }
+
+    private boolean sleep() {        
+        LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis.");
+        try {
+            Thread.sleep(1000);
+            return false;
+        } catch (InterruptedException e) {            
+            LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
+            return true;
+        }
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets an optional timeout period.
+     * <p/>
+     * If the readlock could not be granted within the timeperiod then the wait is stopped and the
+     * <tt>acquireExclusiveReadLock</tt> returns <tt>false</tt>.
+     *
+     * @param timeout period in millis
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (from r789058, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java&r1=789058&r2=789087&rev=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Sun Jun 28 11:15:47 2009
@@ -24,11 +24,11 @@
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileOperations;
 
-public class FileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
+public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
     private GenericFileRenamer<T> beginRenamer;
     private GenericFileRenamer<T> commitRenamer;
 
-    public FileRenameProcessStrategy() {
+    public GenericFileRenameProcessStrategy() {
     }
 
     @Override

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Sun Jun 28 11:15:47 2009
@@ -31,6 +31,7 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -209,43 +210,8 @@
      */
     protected void setProperties(Object bean, Map parameters) throws Exception {        
         // set reference properties first as they use # syntax that fools the regular properties setter
-        setReferenceProperties(bean, parameters);
-        IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), bean, parameters);      
-    }
-
-    /**
-     * Sets the reference properties on the given bean
-     * <p/>
-     * This is convention over configuration, setting all reference parameters (using {@link #isReferenceParameter(String)}
-     * by looking it up in registry and setting it on the bean if possible.
-     */
-    protected void setReferenceProperties(Object bean, Map parameters) throws Exception {
-        Iterator it = parameters.keySet().iterator();
-        while (it.hasNext()) {
-            Object key = it.next();
-            String value = (String) parameters.get(key);
-            if (isReferenceParameter(value)) {
-                Object ref = lookup(value.substring(1));
-                String name = key.toString();
-                if (ref != null) {
-                    boolean hit = IntrospectionSupport.setProperty(getCamelContext().getTypeConverter(), bean, name, ref);
-                    if (hit) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Configued property: " + name + " on bean: " + bean + " with value: " + ref);
-                        }
-                        // must remove as its a valid option and we could configure it
-                        it.remove();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Is the given parameter a reference parameter (starting with a # char)
-     */
-    protected boolean isReferenceParameter(String parameter) {
-        return parameter != null && parameter.startsWith("#");
+        EndpointHelper.setReferenceProperties(getCamelContext(), bean, parameters);
+        EndpointHelper.setProperties(getCamelContext(), bean, parameters);
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Sun Jun 28 11:15:47 2009
@@ -21,6 +21,8 @@
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A default consumer useful for implementation inheritance.
@@ -28,6 +30,7 @@
  * @version $Revision$
  */
 public class DefaultConsumer extends ServiceSupport implements Consumer {
+    private final transient Log log = LogFactory.getLog(getClass());
     private final Endpoint endpoint;
     private final Processor processor;
     private ExceptionHandler exceptionHandler;
@@ -62,10 +65,16 @@
     }
 
     protected void doStop() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Stopping consumer: " + this);
+        }
         ServiceHelper.stopServices(processor);
     }
 
     protected void doStart() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Starting consumer: " + this);
+        }
         ServiceHelper.startServices(processor);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Sun Jun 28 11:15:47 2009
@@ -153,6 +153,13 @@
 
     public <T> T getProperty(String name, Class<T> type) {
         Object value = getProperty(name);
+
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
+        if (type.isInstance(value)) {
+            return type.cast(value);
+        }
+
         return ExchangeHelper.convertToType(this, type, value);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java Sun Jun 28 11:15:47 2009
@@ -44,8 +44,19 @@
 
     public <T> T getHeader(String name, Class<T> type) {
         Object value = getHeader(name);
+
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
+        if (type.isInstance(value)) {
+            return type.cast(value);
+        }
+
         Exchange e = getExchange();
-        return e.getContext().getTypeConverter().convertTo(type, e, value);
+        if (e != null) {
+            return e.getContext().getTypeConverter().convertTo(type, e, value);
+        } else {
+            return (T) value;
+        }
     }
 
     public void setHeader(String name, Object value) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A default implementation that just logs a <tt>WARN</tt> level log in case of rollback.
+ *
+ * @version $Revision$
+ */
+public class DefaultPollingConsumerPollStrategy implements PollingConsumerPollStrategy {
+
+    private static final transient Log LOG = LogFactory.getLog(DefaultPollingConsumerPollStrategy.class);
+
+    public void begin(Consumer consumer, Endpoint endpoint) {
+        // noop
+    }
+
+    public void commit(Consumer consumer, Endpoint endpoint) {
+        // noop
+    }
+
+    public void rollback(Consumer consumer, Endpoint endpoint, Exception e) throws Exception {
+        LOG.warn("Consumer " + consumer +  " could not poll endpoint: " + endpoint.getEndpointUri() + " caused by: " + e.getMessage(), e);
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Sun Jun 28 11:15:47 2009
@@ -20,6 +20,8 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A default implementation of @{link Producer} for implementation inheritence
@@ -27,6 +29,7 @@
  * @version $Revision$
  */
 public abstract class DefaultProducer extends ServiceSupport implements Producer {
+    private final transient Log log = LogFactory.getLog(getClass());
     private final Endpoint endpoint;
 
     public DefaultProducer(Endpoint endpoint) {
@@ -59,8 +62,14 @@
     }
 
     protected void doStart() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Starting producer: " + this);
+        }
     }
 
     protected void doStop() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Stopping producer: " + this);
+        }
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -20,7 +20,6 @@
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MessageSupport.java Sun Jun 28 11:15:47 2009
@@ -58,7 +58,8 @@
     }
 
     protected <T> T getBody(Class<T> type, Object body) {
-        // same instance type
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
         if (type.isInstance(body)) {
             return type.cast(body);
         }
@@ -88,6 +89,12 @@
     }
 
     public <T> T getMandatoryBody(Class<T> type) throws InvalidPayloadException {
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
+        if (type.isInstance(body)) {
+            return type.cast(body);
+        }
+
         Exchange e = getExchange();
         if (e != null) {
             TypeConverter converter = e.getContext().getTypeConverter();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -22,6 +22,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
 import org.apache.camel.Processor;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -44,6 +45,7 @@
     private long delay = 500;
     private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
     private boolean useFixedDelay;
+    private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
 
     public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -75,10 +77,16 @@
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Starting to poll: " + this.getEndpoint());
                 }
+                pollStrategy.begin(this, getEndpoint());
                 poll();
+                pollStrategy.commit(this, getEndpoint());
             }
         } catch (Exception e) {
-            LOG.warn("An exception occurred while polling: " + this.getEndpoint() + ": " + e.getMessage(), e);
+            try {
+                pollStrategy.rollback(this, getEndpoint(), e);
+            } catch (Exception re) {
+                throw ObjectHelper.wrapRuntimeCamelException(re);
+            }
         }
 
         if (LOG.isTraceEnabled()) {
@@ -120,6 +128,14 @@
         this.useFixedDelay = useFixedDelay;
     }
 
+    public PollingConsumerPollStrategy getPollStrategy() {
+        return pollStrategy;
+    }
+
+    public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
+        this.pollStrategy = pollStrategy;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Sun Jun 28 11:15:47 2009
@@ -23,6 +23,7 @@
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.IntrospectionSupport;
 
 /**
@@ -58,7 +59,9 @@
 
     protected void configureConsumer(Consumer consumer) throws Exception {
         if (consumerProperties != null) {
-            IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), consumer, consumerProperties);
+            // set reference properties first as they use # syntax that fools the regular properties setter
+            EndpointHelper.setReferenceProperties(getCamelContext(), consumer, consumerProperties);
+            EndpointHelper.setProperties(getCamelContext(), consumer, consumerProperties);
             if (!this.isLenientProperties() && consumerProperties.size() > 0) {
                 throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + consumerProperties.size()
                     + " parameters that couldn't be set on the endpoint consumer."
@@ -84,7 +87,8 @@
         Object delay = options.remove("delay");
         Object timeUnit = options.remove("timeUnit");
         Object useFixedDelay = options.remove("useFixedDelay");
-        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null) {
+        Object pollStrategy = options.remove("pollStrategy");
+        if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
             if (consumerProperties == null) {
                 consumerProperties = new HashMap();
             }
@@ -100,6 +104,9 @@
             if (useFixedDelay != null) {
                 consumerProperties.put("useFixedDelay", useFixedDelay);
             }
+            if (pollStrategy != null) {
+                consumerProperties.put("pollStrategy", pollStrategy);
+            }
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/EndpointHelper.java Sun Jun 28 11:15:47 2009
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.util;
 
+import java.util.Iterator;
+import java.util.Map;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
@@ -113,4 +116,59 @@
         return false;
     }
 
+    /**
+     * Sets the regular properties on the given bean
+     *
+     * @param context the camel context
+     * @param bean the bean
+     * @param parameters parameters
+     * @throws Exception is thrown if setting property fails
+     */
+    public static void setProperties(CamelContext context, Object bean, Map parameters) throws Exception {
+        IntrospectionSupport.setProperties(context.getTypeConverter(), bean, parameters);
+    }
+
+    /**
+     * Sets the reference properties on the given bean
+     * <p/>
+     * This is convention over configuration, setting all reference parameters (using {@link #isReferenceParameter(String)}
+     * by looking it up in registry and setting it on the bean if possible.
+     *
+     * @param context the camel context
+     * @param bean the bean
+     * @param parameters parameters
+     * @throws Exception is thrown if setting property fails
+     */
+    public static void setReferenceProperties(CamelContext context, Object bean, Map parameters) throws Exception {
+        Iterator it = parameters.keySet().iterator();
+        while (it.hasNext()) {
+            Object key = it.next();
+            String value = (String) parameters.get(key);
+            if (isReferenceParameter(value)) {
+                Object ref = context.getRegistry().lookup(value.substring(1));
+                String name = key.toString();
+                if (ref != null) {
+                    boolean hit = IntrospectionSupport.setProperty(context.getTypeConverter(), bean, name, ref);
+                    if (hit) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Configued property: " + name + " on bean: " + bean + " with value: " + ref);
+                        }
+                        // must remove as its a valid option and we could configure it
+                        it.remove();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Is the given parameter a reference parameter (starting with a # char)
+     *
+     * @param parameter the parameter
+     * @return <tt>true</tt> if its a reference parameter
+     */
+    public static boolean isReferenceParameter(String parameter) {
+        return parameter != null && parameter.startsWith("#");
+    }
+
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=789087&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java Sun Jun 28 11:15:47 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumerPollStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Unit test for poll strategy
+ */
+public class FileConsumerPollStrategyStopOnRollbackTest extends ContextTestSupport {
+
+    private static int counter;
+    private static String event = "";
+
+    private String fileUrl = "file://target/pollstrategy/?pollStrategy=#myPoll";
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myPoll", new MyPollStrategy());
+        return jndi;
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/pollstrategy");
+        template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
+    }
+
+    public void testStopOnRollback() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        // let it run for a little while and since it fails first time we should never get a message
+        Thread.sleep(1000);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("rollback", event);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(fileUrl).to("mock:result");
+            }
+        };
+    }
+
+    private class MyPollStrategy implements PollingConsumerPollStrategy {
+
+        public void begin(Consumer consumer, Endpoint endpoint) {
+            // start consumer as we simualte the fail in begin
+            // and thus before camel lazy start it itself
+            try {
+                consumer.start();
+            } catch (Exception e) {
+                ObjectHelper.wrapRuntimeCamelException(e);
+            }
+
+            if (counter++ == 0) {
+                // simulate an error on first poll
+                throw new IllegalArgumentException("Damn I cannot do this");
+            }
+        }
+
+        public void commit(Consumer consumer, Endpoint endpoint) {
+            event += "commit";
+        }
+
+        public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception {
+            if (cause.getMessage().equals("Damn I cannot do this")) {
+                event += "rollback";
+                // stop consumer as it does not work
+                consumer.stop();
+            }
+        }
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (from r789058, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java&r1=789058&r2=789087&rev=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSkipDotFilesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java Sun Jun 28 11:15:47 2009
@@ -16,47 +16,49 @@
  */
 package org.apache.camel.component.file;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumerPollStrategy;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
 
 /**
- * Unit test that file consumer will skip any files starting with a dot
+ * Unit test for poll strategy
  */
-public class FileConsumerSkipDotFilesTest extends ContextTestSupport {
+public class FileConsumerPollStrategyTest extends ContextTestSupport {
 
-    private String fileUrl = "file://target/dotfiles/";
+    private static int counter;
+    private static String event = "";
+
+    private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll";
 
     @Override
-    protected void setUp() throws Exception {
-        deleteDirectory("target/dotfiles");
-        super.setUp();
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myPoll", new MyPollStrategy());
+        return jndi;
     }
 
-    public void testSkipDotFiles() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(0);
-
-        template.sendBodyAndHeader("file:target/dotfiles/", "This is a dot file",
-            Exchange.FILE_NAME, ".skipme");
-
-        mock.setResultWaitTime(2000);
-        mock.assertIsSatisfied();
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/pollstrategy");
+        template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
     }
 
-    public void testSkipDotFilesWithARegularFile() throws Exception {
+    public void testFirstPollRollbackThenCommit() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Hello World");
 
-        template.sendBodyAndHeader("file:target/dotfiles/", "This is a dot file",
-            Exchange.FILE_NAME, ".skipme");
+        assertMockEndpointsSatisfied();
 
-        template.sendBodyAndHeader("file:target/dotfiles/", "Hello World",
-            Exchange.FILE_NAME, "hello.txt");
+        // give poll strategy a bit time to signal commit
+        Thread.sleep(50);
 
-        mock.assertIsSatisfied();
+        assertEquals("rollbackcommit", event);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -67,4 +69,24 @@
         };
     }
 
-}
+    private class MyPollStrategy implements PollingConsumerPollStrategy {
+
+        public void begin(Consumer consumer, Endpoint endpoint) {
+            if (counter++ == 0) {
+                // simulate an error on first poll
+                throw new IllegalArgumentException("Damn I cannot do this");
+            }
+        }
+
+        public void commit(Consumer consumer, Endpoint endpoint) {
+            event += "commit";
+        }
+
+        public void rollback(Consumer consumer, Endpoint endpoint, Exception cause) throws Exception {
+            if (cause.getMessage().equals("Damn I cannot do this")) {
+                event += "rollback";
+            }
+        }
+    }
+
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java Sun Jun 28 11:15:47 2009
@@ -44,4 +44,9 @@
     public void setExceptionToThrowOnPoll(Exception exceptionToThrowOnPoll) {
         this.exceptionToThrowOnPoll = exceptionToThrowOnPoll;
     }
+
+    @Override
+    public String toString() {
+        return "MockScheduled";
+    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=789087&r1=789086&r2=789087&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java Sun Jun 28 11:15:47 2009
@@ -16,26 +16,52 @@
  */
 package org.apache.camel.impl;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumerPollStrategy;
 
 public class ScheduledPollConsumerTest extends ContextTestSupport {
+
+    private static boolean rollback;
     
     public void testExceptionOnPollAndCanStartAgain() throws Exception {
-        Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
+
+        final Exception expectedException = new Exception("Hello, I should be thrown on shutdown only!");
         MockScheduledPollConsumer consumer = new MockScheduledPollConsumer(expectedException);
 
+        consumer.setPollStrategy(new PollingConsumerPollStrategy() {
+            public void begin(Consumer consumer, Endpoint endpoint) {
+            }
+
+            public void commit(Consumer consumer, Endpoint endpoint) {
+            }
+
+            public void rollback(Consumer consumer, Endpoint endpoint, Exception e) throws Exception {
+                if (e == expectedException) {
+                    rollback = true;
+                }
+
+            }
+        });
+
         consumer.start();
         // poll that throws an exception
         consumer.run();
         consumer.stop();
 
+        assertEquals("Should have rollback", true, rollback);
+
         // prepare for 2nd run but this time it should not thrown an exception on poll
+        rollback = false;
         consumer.setExceptionToThrowOnPoll(null);
         // start it again and we should be able to run
         consumer.start();
         consumer.run();
         // should be able to stop with no problem
         consumer.stop();
+
+        assertEquals("Should not have rollback", false, rollback);
     }
     
     public void testNoExceptionOnPoll() throws Exception {