You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/13 13:01:06 UTC
svn commit: r1058525 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/t...
Author: davsclaus
Date: Thu Jan 13 12:01:05 2011
New Revision: 1058525
URL: http://svn.apache.org/viewvc?rev=1058525&view=rev
Log:
CAMEL-3538: PollingConsumerPollStrategy now has the actual number of messages polled in its commit callback. BatchConsumer adjusted to return number of messages polled.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java
- copied, changed from r1058473, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/BatchConsumer.java Thu Jan 13 12:01:05 2011
@@ -46,9 +46,10 @@ public interface BatchConsumer extends C
* data alongside the Exchange.
*
* @param exchanges list of items in this batch
+ * @return number of messages actually processed
* @throws Exception if an internal processing error has occurred.
*/
- void processBatch(Queue<Object> exchanges) throws Exception;
+ int processBatch(Queue<Object> exchanges) throws Exception;
/**
* Whether processing the batch is still allowed.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Thu Jan 13 12:01:05 2011
@@ -59,7 +59,7 @@ public abstract class GenericFileConsume
/**
* Poll for files
*/
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
// must reset for each poll
fileExpressionResult = null;
shutdownRunningTask = null;
@@ -71,7 +71,7 @@ public abstract class GenericFileConsume
if (log.isDebugEnabled()) {
log.debug("Skipping poll as pre poll check returned false");
}
- return;
+ return 0;
}
// gather list of files to process
@@ -118,9 +118,11 @@ public abstract class GenericFileConsume
}
Queue<Exchange> q = exchanges;
- processBatch(CastUtils.cast(q));
+ int polledMessages = processBatch(CastUtils.cast(q));
postPollCheck();
+
+ return polledMessages;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
@@ -128,7 +130,7 @@ public abstract class GenericFileConsume
}
@SuppressWarnings("unchecked")
- public void processBatch(Queue<Object> exchanges) {
+ public int processBatch(Queue<Object> exchanges) {
int total = exchanges.size();
// limit if needed
@@ -162,6 +164,8 @@ public abstract class GenericFileConsume
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
}
+
+ return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -37,7 +37,7 @@ public class DefaultPollingConsumerPollS
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
// noop
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -41,13 +41,20 @@ public class DefaultScheduledPollConsume
super(endpoint, processor, executor);
}
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
+ int messagesPolled = 0;
+
while (isPollAllowed()) {
Exchange exchange = pollingConsumer.receiveNoWait();
if (exchange == null) {
break;
}
+ messagesPolled++;
+ if (log.isTraceEnabled()) {
+ log.trace("Polled " + messagesPolled + " " + exchange);
+ }
+
// if the result of the polled exchange has output we should create a new exchange and
// use the output as input to the next processor
if (exchange.hasOut()) {
@@ -58,6 +65,8 @@ public class DefaultScheduledPollConsume
}
getProcessor().process(exchange);
}
+
+ return messagesPolled;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LimitedPollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -55,7 +55,7 @@ public class LimitedPollingConsumerPollS
}
@Override
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
// we could commit so clear state
state.remove(consumer);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -94,8 +94,8 @@ public abstract class ScheduledPollConsu
boolean begin = pollStrategy.begin(this, getEndpoint());
if (begin) {
retryCounter++;
- poll();
- pollStrategy.commit(this, getEndpoint());
+ int polledMessages = poll();
+ pollStrategy.commit(this, getEndpoint(), polledMessages);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot begin polling as pollStrategy returned false: " + pollStrategy);
@@ -183,10 +183,11 @@ public abstract class ScheduledPollConsu
/**
* The polling method which is invoked periodically to poll this consumer
- *
+ *
+ * @return number of messages polled, will be <tt>0</tt> if no message was polled at all.
* @throws Exception can be thrown if an exception occurred during polling
*/
- protected abstract void poll() throws Exception;
+ protected abstract int poll() throws Exception;
@Override
protected void doStart() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/PollingConsumerPollStrategy.java Thu Jan 13 12:01:05 2011
@@ -44,8 +44,9 @@ public interface PollingConsumerPollStra
*
* @param consumer the consumer
* @param endpoint the endpoint being consumed
+ * @param polledMessages number of messages polled, will be <tt>0</tt> if no message was polled at all.
*/
- void commit(Consumer consumer, Endpoint endpoint);
+ void commit(Consumer consumer, Endpoint endpoint, int polledMessages);
/**
* Called when poll failed
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyNotBeginTest.java Thu Jan 13 12:01:05 2011
@@ -82,7 +82,7 @@ public class FileConsumerPollStrategyNot
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
event += "commit";
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java (from r1058473, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java&r1=1058473&r2=1058525&rev=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyPolledMessagesTest.java Thu Jan 13 12:01:05 2011
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.file;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
@@ -28,12 +31,12 @@ import org.apache.camel.spi.PollingConsu
/**
* Unit test for poll strategy
*/
-public class FileConsumerPollStrategyTest extends ContextTestSupport {
+public class FileConsumerPollStrategyPolledMessagesTest extends ContextTestSupport {
- private static int counter;
- private static String event = "";
+ private static int maxPolls;
+ private final CountDownLatch latch = new CountDownLatch(1);
- private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll&noop=true&initialDelay=0&delay=10";
+ private String fileUrl = "file://target/pollstrategy/?consumer.pollStrategy=#myPoll";
@Override
protected JndiRegistry createRegistry() throws Exception {
@@ -46,27 +49,31 @@ public class FileConsumerPollStrategyTes
protected void setUp() throws Exception {
super.setUp();
deleteDirectory("target/pollstrategy");
- template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
}
- public void testFirstPollRollbackThenCommit() throws Exception {
+ public void testPolledMessages() throws Exception {
+ template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file:target/pollstrategy/", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ // start route now files have been created
+ context.startRoute("foo");
+
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(1);
+ mock.expectedMessageCount(2);
assertMockEndpointsSatisfied();
- oneExchangeDone.matchesMockWaitTime();
+ // wait for commit to be issued
+ latch.await(5, TimeUnit.SECONDS);
- // give file consumer a bit time
- Thread.sleep(20);
-
- assertTrue(event.startsWith("rollbackcommit"));
+ assertEquals(2, maxPolls);
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from(fileUrl).convertBodyTo(String.class).to("mock:result");
+ from(fileUrl).routeId("foo").noAutoStartup()
+ .convertBodyTo(String.class).to("mock:result");
}
};
}
@@ -74,21 +81,17 @@ public class FileConsumerPollStrategyTes
private class MyPollStrategy implements PollingConsumerPollStrategy {
public boolean begin(Consumer consumer, Endpoint endpoint) {
- if (counter++ == 0) {
- // simulate an error on first poll
- throw new IllegalArgumentException("Damn I cannot do this");
- }
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
- event += "commit";
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
+ if (polledMessages > maxPolls) {
+ maxPolls = polledMessages;
+ }
+ latch.countDown();
}
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
- if (cause.getMessage().equals("Damn I cannot do this")) {
- event += "rollback";
- }
return false;
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java Thu Jan 13 12:01:05 2011
@@ -70,7 +70,7 @@ public class FileConsumerPollStrategyRol
throw new IllegalArgumentException("Damn I cannot do this");
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
event += "commit";
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java Thu Jan 13 12:01:05 2011
@@ -87,7 +87,7 @@ public class FileConsumerPollStrategySto
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
event += "commit";
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java Thu Jan 13 12:01:05 2011
@@ -81,7 +81,7 @@ public class FileConsumerPollStrategyTes
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
event += "commit";
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MockScheduledPollConsumer.java Thu Jan 13 12:01:05 2011
@@ -35,10 +35,11 @@ public class MockScheduledPollConsumer e
}
@Override
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
if (exceptionToThrowOnPoll != null) {
throw exceptionToThrowOnPoll;
}
+ return 0;
}
public void setExceptionToThrowOnPoll(Exception exceptionToThrowOnPoll) {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java Thu Jan 13 12:01:05 2011
@@ -37,7 +37,7 @@ public class ScheduledPollConsumerTest e
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
}
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
@@ -79,7 +79,7 @@ public class ScheduledPollConsumerTest e
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
event += "commit";
}
Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (original)
+++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -40,15 +40,17 @@ public abstract class FeedEntryPollingCo
this.throttleEntries = throttleEntries;
}
- public void poll() throws Exception {
+ public int poll() throws Exception {
if (feed == null) {
// populate new feed
feed = createFeed();
populateList(feed);
}
+ int polledMessages = 0;
while (hasNextEntry()) {
Object entry = list.get(entryIndex--);
+ polledMessages++;
boolean valid = true;
if (entryFilter != null) {
@@ -59,7 +61,7 @@ public abstract class FeedEntryPollingCo
getProcessor().process(exchange);
if (this.throttleEntries) {
// return and wait for the next poll to continue from last time (this consumer is stateful)
- return;
+ return polledMessages;
}
}
}
@@ -67,6 +69,8 @@ public abstract class FeedEntryPollingCo
// reset feed and list to be able to poll again
feed = null;
resetList();
+
+ return polledMessages;
}
protected abstract EntryFilter createEntryFilter(Date lastUpdate);
Modified: camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java (original)
+++ camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -32,9 +32,15 @@ public abstract class FeedPollingConsume
this.endpoint = endpoint;
}
- protected void poll() throws Exception {
- Exchange exchange = endpoint.createExchange(createFeed());
- getProcessor().process(exchange);
+ protected int poll() throws Exception {
+ Object feed = createFeed();
+ if (feed != null) {
+ Exchange exchange = endpoint.createExchange(feed);
+ getProcessor().process(exchange);
+ return 1;
+ } else {
+ return 0;
+ }
}
protected abstract Object createFeed() throws Exception;
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Thu Jan 13 12:01:05 2011
@@ -59,7 +59,7 @@ public class SqsConsumer extends Schedul
}
@Override
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
@@ -71,7 +71,7 @@ public class SqsConsumer extends Schedul
ReceiveMessageResult messageResult = getClient().receiveMessage(request);
Queue<Exchange> exchanges = createExchanges(messageResult.getMessages());
- processBatch(CastUtils.cast(exchanges));
+ return processBatch(CastUtils.cast(exchanges));
}
protected Queue<Exchange> createExchanges(List<Message> messages) {
@@ -88,7 +88,7 @@ public class SqsConsumer extends Schedul
return answer;
}
- public void processBatch(Queue<Object> exchanges) throws Exception {
+ public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
for (int index = 0; index < total && isBatchAllowed(); index++) {
@@ -124,6 +124,8 @@ public class SqsConsumer extends Schedul
getProcessor().process(exchange);
}
+
+ return total;
}
/**
Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpSimulateNetworkIssueRecoverTest.java Thu Jan 13 12:01:05 2011
@@ -72,7 +72,7 @@ public class FromFtpSimulateNetworkIssue
public class MyPollStrategy extends RemoteFilePollingConsumerPollStrategy {
@Override
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
counter++;
if (counter < 3) {
throw new IllegalArgumentException("Forced by unit test");
Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerThrowExceptionOnLoginFailedTest.java Thu Jan 13 12:01:05 2011
@@ -78,7 +78,7 @@ public class FtpConsumerThrowExceptionOn
return true;
}
- public void commit(Consumer consumer, Endpoint endpoint) {
+ public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
}
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception cause) throws Exception {
Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java (original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisPollingConsumer.java Thu Jan 13 12:01:05 2011
@@ -142,8 +142,7 @@ public class IBatisPollingConsumer exten
* Polls the database
*/
@Override
- protected void poll() throws Exception {
-
+ protected int poll() throws Exception {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
@@ -176,14 +175,14 @@ public class IBatisPollingConsumer exten
}
// process all the exchanges in this batch
- processBatch(CastUtils.cast(answer));
+ return processBatch(CastUtils.cast(answer));
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public void processBatch(Queue<Object> exchanges) throws Exception {
+ public int processBatch(Queue<Object> exchanges) throws Exception {
final IBatisEndpoint endpoint = getEndpoint();
int total = exchanges.size();
@@ -222,6 +221,8 @@ public class IBatisPollingConsumer exten
handleException(e);
}
}
+
+ return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Thu Jan 13 12:01:05 2011
@@ -71,12 +71,12 @@ public class JpaConsumer extends Schedul
}
@Override
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
- template.execute(new JpaCallback() {
+ Object messagePolled = template.execute(new JpaCallback() {
public Object doInJpa(EntityManager entityManager) throws PersistenceException {
Queue<DataHolder> answer = new LinkedList<DataHolder>();
@@ -91,27 +91,26 @@ public class JpaConsumer extends Schedul
answer.add(holder);
}
+ int messagePolled;
try {
- processBatch(CastUtils.cast(answer));
+ messagePolled = processBatch(CastUtils.cast(answer));
} catch (Exception e) {
throw new PersistenceException(e);
}
entityManager.flush();
- return null;
+ return messagePolled;
}
});
+
+ return endpoint.getCamelContext().getTypeConverter().convertTo(int.class, messagePolled);
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public void processBatch(Queue<Object> exchanges) throws Exception {
- if (exchanges.isEmpty()) {
- return;
- }
-
+ public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
// limit if needed
@@ -148,6 +147,8 @@ public class JpaConsumer extends Schedul
getDeleteHandler().deleteObject(entityManager, result);
}
}
+
+ return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Thu Jan 13 12:01:05 2011
@@ -79,10 +79,11 @@ public class MailConsumer extends Schedu
super.doStop();
}
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
// must reset for each poll
shutdownRunningTask = null;
pendingExchanges = 0;
+ int polledMessages = 0;
ensureIsConnected();
@@ -97,7 +98,7 @@ public class MailConsumer extends Schedu
if (getEndpoint().getConfiguration().getFetchSize() == 0) {
LOG.warn("Fetch size is 0 meaning the configuration is set to poll no new messages at all. Camel will skip this poll.");
- return;
+ return 0;
}
// ensure folder is open
@@ -117,7 +118,7 @@ public class MailConsumer extends Schedu
messages = folder.getMessages();
}
- processBatch(CastUtils.cast(createExchanges(messages)));
+ polledMessages = processBatch(CastUtils.cast(createExchanges(messages)));
} else if (count == -1) {
throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
}
@@ -134,13 +135,15 @@ public class MailConsumer extends Schedu
LOG.debug("Could not close mailbox folder: " + folder.getName(), e);
}
}
+
+ return polledMessages;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public void processBatch(Queue<Object> exchanges) throws Exception {
+ public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
// limit if needed
@@ -184,6 +187,8 @@ public class MailConsumer extends Schedu
// process the exchange
processExchange(exchange);
}
+
+ return total;
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
Modified: camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java?rev=1058525&r1=1058524&r2=1058525&view=diff
==============================================================================
--- camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java (original)
+++ camel/trunk/components/camel-snmp/src/main/java/org/apache/camel/component/snmp/SnmpOIDPoller.java Thu Jan 13 12:01:05 2011
@@ -111,7 +111,7 @@ public class SnmpOIDPoller extends Sched
}
@Override
- protected void poll() throws Exception {
+ protected int poll() throws Exception {
this.pdu.clear();
this.pdu.setType(PDU.GET);
@@ -122,6 +122,8 @@ public class SnmpOIDPoller extends Sched
// send the request
snmp.send(pdu, target, null, this);
+
+ return 1;
}
public void onResponse(ResponseEvent event) {