You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/13 13:01:06 UTC

svn commit: r1058525 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/t...

Author: davsclaus
Date: Thu Jan 13 12:01:05 2011
New Revision: 1058525

URL: http://svn.apache.org/viewvc?rev=1058525&view=rev
Log:
CAMEL-3538: PollingConsumerPollStrategy now has the actual number of messages polled in its commit callback. BatchConsumer adjusted to return number of messages polled.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java
      - copied, changed from r1058473, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
    camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
    camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java
    camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java Thu Jan 13 12:01:05 2011
@@ -46,9 +46,10 @@ public interface BatchConsumer extends C
      * data alongside the Exchange.
      *
      * @param exchanges list of items in this batch
+     * @return number of messages actually processed
      * @throws Exception if an internal processing error has occurred.
      */
-    void processBatch(Queue<Object> exchanges) throws Exception;
+    int processBatch(Queue<Object> exchanges) throws Exception;
 
     /**
      * Whether processing the batch is still allowed.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Thu Jan 13 12:01:05 2011
@@ -59,7 +59,7 @@ public abstract class GenericFileConsume
     /**
      * Poll for files
      */
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         // must reset for each poll
         fileExpressionResult = null;
         shutdownRunningTask = null;
@@ -71,7 +71,7 @@ public abstract class GenericFileConsume
             if (log.isDebugEnabled()) {
                 log.debug("Skipping poll as pre poll check returned false");
             }
-            return;
+            return 0;
         }
 
         // gather list of files to process
@@ -118,9 +118,11 @@ public abstract class GenericFileConsume
         }
 
         Queue<Exchange> q = exchanges;
-        processBatch(CastUtils.cast(q));
+        int polledMessages = processBatch(CastUtils.cast(q));
 
         postPollCheck();
+
+        return polledMessages;
     }
 
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
@@ -128,7 +130,7 @@ public abstract class GenericFileConsume
     }
 
     @SuppressWarnings("unchecked")
-    public void processBatch(Queue<Object> exchanges) {
+    public int processBatch(Queue<Object> exchanges) {
         int total = exchanges.size();
 
         // limit if needed
@@ -162,6 +164,8 @@ public abstract class GenericFileConsume
             String key = file.getAbsoluteFilePath();
             endpoint.getInProgressRepository().remove(key);
         }
+
+        return total;
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -37,7 +37,7 @@ public class DefaultPollingConsumerPollS
         return true;
     }
 
-    public void commit(Consumer consumer, Endpoint endpoint) {
+    public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
         // noop
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -41,13 +41,20 @@ public class DefaultScheduledPollConsume
         super(endpoint, processor, executor);
     }
 
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
+        int messagesPolled = 0;
+
         while (isPollAllowed()) {
             Exchange exchange = pollingConsumer.receiveNoWait();
             if (exchange == null) {
                 break;
             }
 
+            messagesPolled++;
+            if (log.isTraceEnabled()) {
+                log.trace("Polled " + messagesPolled + " " + exchange);
+            }
+
             // if the result of the polled exchange has output we should create a new exchange and
             // use the output as input to the next processor
             if (exchange.hasOut()) {
@@ -58,6 +65,8 @@ public class DefaultScheduledPollConsume
             }
             getProcessor().process(exchange);
         }
+
+        return messagesPolled;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -55,7 +55,7 @@ public class LimitedPollingConsumerPollS
     }
 
     @Override
-    public void commit(Consumer consumer, Endpoint endpoint) {
+    public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
         // we could commit so clear state
         state.remove(consumer);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -94,8 +94,8 @@ public abstract class ScheduledPollConsu
                     boolean begin = pollStrategy.begin(this, getEndpoint());
                     if (begin) {
                         retryCounter++;
-                        poll();
-                        pollStrategy.commit(this, getEndpoint());
+                        int polledMessages = poll();
+                        pollStrategy.commit(this, getEndpoint(), polledMessages);
                     } else {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Cannot begin polling as pollStrategy returned false: " + pollStrategy);
@@ -183,10 +183,11 @@ public abstract class ScheduledPollConsu
 
     /**
      * The polling method which is invoked periodically to poll this consumer
-     * 
+     *
+     * @return number of messages polled, will be <tt>0</tt> if no message was polled at all.
      * @throws Exception can be thrown if an exception occurred during polling
      */
-    protected abstract void poll() throws Exception;
+    protected abstract int poll() throws Exception;
 
     @Override
     protected void doStart() throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -44,8 +44,9 @@ public interface PollingConsumerPollStra
      *
      * @param consumer the consumer
      * @param endpoint the endpoint being consumed
+     * @param polledMessages number of messages polled, will be <tt>0</tt> if no message was polled at all.
      */
-    void commit(Consumer consumer, Endpoint endpoint);
+    void commit(Consumer consumer, Endpoint endpoint, int polledMessages);
 
     /**
      * Called when poll failed

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java Thu Jan 13 12:01:05 2011
@@ -82,7 +82,7 @@ public class FileConsumerPollStrategyNot
             return true;
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             event += "commit";
         }
 

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java (from r1058473, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java&r1=1058473&r2=1058525&rev=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java Thu Jan 13 12:01:05 2011
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.file;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
@@ -28,12 +31,12 @@ import org.apache.camel.spi.PollingConsu
 /**
  * Unit test for poll strategy
  */
-public class FileConsumerPollStrategyTest extends ContextTestSupport {
+public class FileConsumerPollStrategyPolledMessagesTest extends ContextTestSupport {
 
-    private static int counter;
-    private static String event = "";
+    private static int maxPolls;
+    private final CountDownLatch latch = new CountDownLatch(1);
 
-    private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10";
+    private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll";
 
     @Override
     protected JndiRegistry createRegistry() throws Exception {
@@ -46,27 +49,31 @@ public class FileConsumerPollStrategyTes
     protected void setUp() throws Exception {
         super.setUp();
         deleteDirectory("target/pollstrategy");
-        template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
     }
 
-    public void testFirstPollRollbackThenCommit() throws Exception {
+    public void testPolledMessages() throws Exception {
+        template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/pollstrategy/", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        // start route now files have been created
+        context.startRoute("foo");
+
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
+        mock.expectedMessageCount(2);
 
         assertMockEndpointsSatisfied();
 
-        oneExchangeDone.matchesMockWaitTime();
+        // wait for commit to be issued
+        latch.await(5, TimeUnit.SECONDS);
 
-        // give file consumer a bit time
-        Thread.sleep(20);
-
-        assertTrue(event.startsWith("rollbackcommit"));
+        assertEquals(2, maxPolls);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from(fileUrl).convertBodyTo(String.class).to("mock:result");
+                from(fileUrl).routeId("foo").noAutoStartup()
+                    .convertBodyTo(String.class).to("mock:result");
             }
         };
     }
@@ -74,21 +81,17 @@ public class FileConsumerPollStrategyTes
     private class MyPollStrategy implements PollingConsumerPollStrategy {
 
         public boolean begin(Consumer consumer, Endpoint endpoint) {
-            if (counter++ == 0) {
-                // simulate an error on first poll
-                throw new IllegalArgumentException("Damn I cannot do this");
-            }
             return true;
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
-            event += "commit";
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
+            if (polledMessages > maxPolls) {
+                maxPolls = polledMessages;
+            }
+            latch.countDown();
         }
 
         public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
-            if (cause.getMessage().equals("Damn I cannot do this")) {
-                event += "rollback";
-            }
             return false;
         }
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java Thu Jan 13 12:01:05 2011
@@ -70,7 +70,7 @@ public class FileConsumerPollStrategyRol
             throw new IllegalArgumentException("Damn I cannot do this");
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             event += "commit";
         }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java Thu Jan 13 12:01:05 2011
@@ -87,7 +87,7 @@ public class FileConsumerPollStrategySto
             return true;
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             event += "commit";
         }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java Thu Jan 13 12:01:05 2011
@@ -81,7 +81,7 @@ public class FileConsumerPollStrategyTes
             return true;
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             event += "commit";
         }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -35,10 +35,11 @@ public class MockScheduledPollConsumer e
     }
     
     @Override
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         if (exceptionToThrowOnPoll != null) {
             throw exceptionToThrowOnPoll;
         }
+        return 0;
     }
 
     public void setExceptionToThrowOnPoll(Exception exceptionToThrowOnPoll) {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java Thu Jan 13 12:01:05 2011
@@ -37,7 +37,7 @@ public class ScheduledPollConsumerTest e
                 return true;
             }
 
-            public void commit(Consumer consumer, Endpoint endpoint) {
+            public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             }
 
             public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
@@ -79,7 +79,7 @@ public class ScheduledPollConsumerTest e
                 return true;
             }
 
-            public void commit(Consumer consumer, Endpoint endpoint) {
+            public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
                 event += "commit";
             }
 

Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (original)
+++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -40,15 +40,17 @@ public abstract class FeedEntryPollingCo
         this.throttleEntries = throttleEntries;
     }
 
-    public void poll() throws Exception {
+    public int poll() throws Exception {
         if (feed == null) {
             // populate new feed
             feed = createFeed();
             populateList(feed);
         }
 
+        int polledMessages = 0;
         while (hasNextEntry()) {
             Object entry = list.get(entryIndex--);
+            polledMessages++;
 
             boolean valid = true;
             if (entryFilter != null) {
@@ -59,7 +61,7 @@ public abstract class FeedEntryPollingCo
                 getProcessor().process(exchange);
                 if (this.throttleEntries) {
                     // return and wait for the next poll to continue from last time (this consumer is stateful)
-                    return;
+                    return polledMessages;
                 }
             }
         }
@@ -67,6 +69,8 @@ public abstract class FeedEntryPollingCo
         // reset feed and list to be able to poll again
         feed = null;
         resetList();
+
+        return polledMessages;
     }
 
     protected abstract EntryFilter createEntryFilter(Date lastUpdate);

Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java (original)
+++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -32,9 +32,15 @@ public abstract class FeedPollingConsume
         this.endpoint = endpoint;
     }
 
-    protected void poll() throws Exception {
-        Exchange exchange = endpoint.createExchange(createFeed());
-        getProcessor().process(exchange);
+    protected int poll() throws Exception {
+        Object feed = createFeed();
+        if (feed != null) {
+            Exchange exchange = endpoint.createExchange(feed);
+            getProcessor().process(exchange);
+            return 1;
+        } else {
+            return 0;
+        }
     }
 
     protected abstract Object createFeed() throws Exception;

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Thu Jan 13 12:01:05 2011
@@ -59,7 +59,7 @@ public class SqsConsumer extends Schedul
     }
 
     @Override
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
@@ -71,7 +71,7 @@ public class SqsConsumer extends Schedul
         ReceiveMessageResult messageResult = getClient().receiveMessage(request);
         
         Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
-        processBatch(CastUtils.cast(exchanges));
+        return processBatch(CastUtils.cast(exchanges));
     }
     
     protected Queue<Exchange> createExchanges(List<Message> messages) {
@@ -88,7 +88,7 @@ public class SqsConsumer extends Schedul
         return answer;
     }
     
-    public void processBatch(Queue<Object> exchanges) throws Exception {
+    public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
@@ -124,6 +124,8 @@ public class SqsConsumer extends Schedul
 
             getProcessor().process(exchange);
         }
+
+        return total;
     }
     
     /**

Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java Thu Jan 13 12:01:05 2011
@@ -72,7 +72,7 @@ public class FromFtpSimulateNetworkIssue
     public class MyPollStrategy extends RemoteFilePollingConsumerPollStrategy {
 
         @Override
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
             counter++;
             if (counter < 3) {
                 throw new IllegalArgumentException("Forced by unit test");

Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java Thu Jan 13 12:01:05 2011
@@ -78,7 +78,7 @@ public class FtpConsumerThrowExceptionOn
             return true;
         }
 
-        public void commit(Consumer consumer, Endpoint endpoint) {
+        public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
         }
 
         public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {

Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java (original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -142,8 +142,7 @@ public class IBatisPollingConsumer exten
      * Polls the database
      */
     @Override
-    protected void poll() throws Exception {
-
+    protected int poll() throws Exception {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
@@ -176,14 +175,14 @@ public class IBatisPollingConsumer exten
         }
 
         // process all the exchanges in this batch
-        processBatch(CastUtils.cast(answer));
+        return processBatch(CastUtils.cast(answer));
     }
 
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
-    public void processBatch(Queue<Object> exchanges) throws Exception {
+    public int processBatch(Queue<Object> exchanges) throws Exception {
         final IBatisEndpoint endpoint = getEndpoint();
 
         int total = exchanges.size();
@@ -222,6 +221,8 @@ public class IBatisPollingConsumer exten
                 handleException(e);
             }
         }
+
+        return total;
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Thu Jan 13 12:01:05 2011
@@ -71,12 +71,12 @@ public class JpaConsumer extends Schedul
     }
 
     @Override
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        template.execute(new JpaCallback() {
+        Object messagePolled = template.execute(new JpaCallback() {
             public Object doInJpa(EntityManager entityManager) throws PersistenceException {
                 Queue<DataHolder> answer = new LinkedList<DataHolder>();
 
@@ -91,27 +91,26 @@ public class JpaConsumer extends Schedul
                     answer.add(holder);
                 }
 
+                int messagePolled;
                 try {
-                    processBatch(CastUtils.cast(answer));
+                    messagePolled = processBatch(CastUtils.cast(answer));
                 } catch (Exception e) {
                     throw new PersistenceException(e);
                 }
 
                 entityManager.flush();
-                return null;
+                return messagePolled;
             }
         });
+
+        return endpoint.getCamelContext().getTypeConverter().convertTo(int.class, messagePolled);
     }
 
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
-    public void processBatch(Queue<Object> exchanges) throws Exception {
-        if (exchanges.isEmpty()) {
-            return;
-        }
-
+    public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
         // limit if needed
@@ -148,6 +147,8 @@ public class JpaConsumer extends Schedul
                 getDeleteHandler().deleteObject(entityManager, result);
             }
         }
+
+        return total;
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Thu Jan 13 12:01:05 2011
@@ -79,10 +79,11 @@ public class MailConsumer extends Schedu
         super.doStop();
     }
 
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         // must reset for each poll
         shutdownRunningTask = null;
         pendingExchanges = 0;
+        int polledMessages = 0;
 
         ensureIsConnected();
 
@@ -97,7 +98,7 @@ public class MailConsumer extends Schedu
 
         if (getEndpoint().getConfiguration().getFetchSize() == 0) {
             LOG.warn("Fetch size is 0 meaning the configuration is set to poll no new messages at all. Camel will skip this poll.");
-            return;
+            return 0;
         }
 
         // ensure folder is open
@@ -117,7 +118,7 @@ public class MailConsumer extends Schedu
                     messages = folder.getMessages();
                 }
 
-                processBatch(CastUtils.cast(createExchanges(messages)));
+                polledMessages = processBatch(CastUtils.cast(createExchanges(messages)));
             } else if (count == -1) {
                 throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
             }
@@ -134,13 +135,15 @@ public class MailConsumer extends Schedu
                 LOG.debug("Could not close mailbox folder: " + folder.getName(), e);
             }
         }
+
+        return polledMessages;
     }
 
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
-    public void processBatch(Queue<Object> exchanges) throws Exception {
+    public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
         // limit if needed
@@ -184,6 +187,8 @@ public class MailConsumer extends Schedu
             // process the exchange
             processExchange(exchange);
         }
+
+        return total;
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {

Modified: camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java (original)
+++ camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java Thu Jan 13 12:01:05 2011
@@ -111,7 +111,7 @@ public class SnmpOIDPoller extends Sched
     }
 
     @Override
-    protected void poll() throws Exception {
+    protected int poll() throws Exception {
         this.pdu.clear();
         this.pdu.setType(PDU.GET);
 
@@ -122,6 +122,8 @@ public class SnmpOIDPoller extends Sched
 
         // send the request
         snmp.send(pdu, target, null, this);
+
+        return 1;
     }
 
     public void onResponse(ResponseEvent event) {