You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/08/09 11:58:26 UTC
svn commit: r1155292 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/impl/
test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Tue Aug 9 09:58:25 2011
New Revision: 1155292
URL: http://svn.apache.org/viewvc?rev=1155292&view=rev
Log:
CAMEL-4318: Scheduled poll consumer from pollEnrich should not be polling when route starts, but on demand
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
- copied, changed from r1155236, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Tue Aug 9 09:58:25 2011
@@ -36,9 +36,12 @@ public interface PollingConsumerPollingS
/**
* Callback invoked before the poll.
*
+ * @param timeout the timeout
* @throws Exception can be thrown if error occurred
+ * @return timeout to be used, this allows returning a higher timeout value
+ * to ensure at least one poll is being performed
*/
- void beforePoll() throws Exception;
+ long beforePoll(long timeout) throws Exception;
/**
* Callback invoked after the poll.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Tue Aug 9 09:58:25 2011
@@ -65,7 +65,8 @@ public class EventDrivenPollingConsumer
while (isRunAllowed()) {
try {
- beforePoll();
+ beforePoll(0);
+ // take will block waiting for message
return queue.take();
} catch (InterruptedException e) {
handleInterruptedException(e);
@@ -83,13 +84,9 @@ public class EventDrivenPollingConsumer
throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
}
- // if the queue is empty and there is no wait then return null
- if (timeout == 0 && queue.isEmpty()) {
- return null;
- }
-
try {
- beforePoll();
+ // use the timeout value returned from beforePoll
+ timeout = beforePoll(timeout);
return queue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
handleInterruptedException(e);
@@ -115,15 +112,16 @@ public class EventDrivenPollingConsumer
getInterruptedExceptionHandler().handleException(e);
}
- protected void beforePoll() {
+ protected long beforePoll(long timeout) {
if (consumer instanceof PollingConsumerPollingStrategy) {
PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
try {
- strategy.beforePoll();
+ timeout = strategy.beforePoll(timeout);
} catch (Exception e) {
LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
}
}
+ return timeout;
}
protected void afterPoll() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Aug 9 09:58:25 2011
@@ -276,20 +276,28 @@ public abstract class ScheduledPollConsu
@Override
public void onInit() throws Exception {
- // start our self
- ServiceHelper.startService(this);
+ // noop
}
@Override
- public void beforePoll() throws Exception {
- // resume our self
- ServiceHelper.resumeService(this);
+ public long beforePoll(long timeout) throws Exception {
+ LOG.trace("Before poll {}", getEndpoint());
+ // resume or start our self
+ if (!ServiceHelper.resumeService(this)) {
+ ServiceHelper.startService(this);
+ };
+
+ // ensure at least timeout is as long as one poll delay
+ return Math.max(timeout, getDelay());
}
@Override
public void afterPoll() throws Exception {
- // suspend our self
- ServiceHelper.suspendService(this);
+ LOG.trace("After poll {}", getEndpoint());
+ // suspend or stop our self
+ if (!ServiceHelper.suspendService(this)) {
+ ServiceHelper.stopService(this);
+ }
}
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java (from r1155236, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java&r1=1155236&r2=1155292&rev=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java Tue Aug 9 09:58:25 2011
@@ -28,7 +28,7 @@ import org.apache.camel.component.mock.M
/**
* @version
*/
-public class FilePollEnrichTest extends ContextTestSupport {
+public class FilePollEnrichNoWaitTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
@@ -36,7 +36,7 @@ public class FilePollEnrichTest extends
super.setUp();
}
- public void testFilePollEnrich() throws Exception {
+ public void testFilePollEnrichNoWait() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
mock.expectedFileExists("target/pollenrich/done/hello.txt");
@@ -56,9 +56,14 @@ public class FilePollEnrichTest extends
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=2000").routeId("foo")
- .pollEnrich("file:target/pollenrich?move=done", 5000)
+ from("timer:foo?period=1000").routeId("foo")
+ .log("Trigger timer foo")
+ .pollEnrich("file:target/pollenrich?move=done")
.convertBodyTo(String.class)
+ .filter(body().isNull())
+ .stop()
+ .end()
+ .log("Polled filed ${file:name}")
.to("mock:result")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java?rev=1155292&r1=1155291&r2=1155292&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java Tue Aug 9 09:58:25 2011
@@ -56,9 +56,11 @@ public class FilePollEnrichTest extends
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=2000").routeId("foo")
+ from("timer:foo?period=1000").routeId("foo")
+ .log("Trigger timer foo")
.pollEnrich("file:target/pollenrich?move=done", 5000)
.convertBodyTo(String.class)
+ .log("Polled filed ${file:name}")
.to("mock:result")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {