You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 19:47:37 UTC
svn commit: r1172730 - in /camel/branches/camel-2.8.x: ./
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/test/java/org/apache/camel/component/file/
Author: dkulp
Date: Mon Sep 19 17:47:36 2011
New Revision: 1172730
URL: http://svn.apache.org/viewvc?rev=1172730&view=rev
Log:
Merged revisions 1155292 via svnmerge from
https://svn.apache.org/repos/asf/camel/trunk
........
r1155292 | davsclaus | 2011-08-09 05:58:25 -0400 (Tue, 09 Aug 2011) | 1 line
CAMEL-4318: Scheduled poll consumer from pollEnrich should not be polling when route starts, but on demand
........
Added:
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
- copied unchanged from r1155292, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichNoWaitTest.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 17:47:36 2011
@@ -1 +1 @@
-/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947
+/camel/trunk:1148706,1148710,1149570,1150300-1150311,1150651,1151000,1151054,1151087,1151126,1151362,1152170,1152569,1152733,1152755,1152868,1153620,1153812,1153829,1154684,1155230,1155292,1156108,1156260,1156277,1156479,1156524,1157348,1157749,1157798,1157831,1157878,1158153,1158230,1158295,1159171,1159174,1159326,1159457,1159460,1159606,1159682-1159683,1159867,1160547,1160637,1161010,1161082,1161524,1162309,1162395,1163231,1163420,1163656-1163669,1163725,1164343,1164544,1164557,1164633,1164972-1165000,1165152,1165157,1165658,1165971,1165987,1167098,1167131,1167448,1167487,1167555,1169610,1169620,1170122,1170226,1170397,1170956,1171396,1171755,1171941,1171947
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Mon Sep 19 17:47:36 2011
@@ -36,9 +36,12 @@ public interface PollingConsumerPollingS
/**
* Callback invoked before the poll.
*
+ * @param timeout the timeout
* @throws Exception can be thrown if error occurred
+ * @return timeout to be used, this allows returning a higher timeout value
+ * to ensure at least one poll is being performed
*/
- void beforePoll() throws Exception;
+ long beforePoll(long timeout) throws Exception;
/**
* Callback invoked after the poll.
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Mon Sep 19 17:47:36 2011
@@ -65,7 +65,8 @@ public class EventDrivenPollingConsumer
while (isRunAllowed()) {
try {
- beforePoll();
+ beforePoll(0);
+ // take will block waiting for message
return queue.take();
} catch (InterruptedException e) {
handleInterruptedException(e);
@@ -83,13 +84,9 @@ public class EventDrivenPollingConsumer
throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
}
- // if the queue is empty and there is no wait then return null
- if (timeout == 0 && queue.isEmpty()) {
- return null;
- }
-
try {
- beforePoll();
+ // use the timeout value returned from beforePoll
+ timeout = beforePoll(timeout);
return queue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
handleInterruptedException(e);
@@ -115,15 +112,16 @@ public class EventDrivenPollingConsumer
getInterruptedExceptionHandler().handleException(e);
}
- protected void beforePoll() {
+ protected long beforePoll(long timeout) {
if (consumer instanceof PollingConsumerPollingStrategy) {
PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
try {
- strategy.beforePoll();
+ timeout = strategy.beforePoll(timeout);
} catch (Exception e) {
LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
}
}
+ return timeout;
}
protected void afterPoll() {
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Mon Sep 19 17:47:36 2011
@@ -275,20 +275,28 @@ public abstract class ScheduledPollConsu
@Override
public void onInit() throws Exception {
- // start our self
- ServiceHelper.startService(this);
+ // noop
}
@Override
- public void beforePoll() throws Exception {
- // resume our self
- ServiceHelper.resumeService(this);
+ public long beforePoll(long timeout) throws Exception {
+ LOG.trace("Before poll {}", getEndpoint());
+ // resume or start our self
+ if (!ServiceHelper.resumeService(this)) {
+ ServiceHelper.startService(this);
+ };
+
+ // ensure at least timeout is as long as one poll delay
+ return Math.max(timeout, getDelay());
}
@Override
public void afterPoll() throws Exception {
- // suspend our self
- ServiceHelper.suspendService(this);
+ LOG.trace("After poll {}", getEndpoint());
+ // suspend or stop our self
+ if (!ServiceHelper.suspendService(this)) {
+ ServiceHelper.stopService(this);
+ }
}
}
Modified: camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java?rev=1172730&r1=1172729&r2=1172730&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java Mon Sep 19 17:47:36 2011
@@ -56,9 +56,11 @@ public class FilePollEnrichTest extends
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=2000").routeId("foo")
+ from("timer:foo?period=1000").routeId("foo")
+ .log("Trigger timer foo")
.pollEnrich("file:target/pollenrich?move=done", 5000)
.convertBodyTo(String.class)
+ .log("Polled filed ${file:name}")
.to("mock:result")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {