You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/05 13:56:22 UTC
svn commit: r1197951 - in /camel/branches/camel-2.8.x: ./
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/impl/
components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/
components/camel-a...
Author: davsclaus
Date: Sat Nov 5 12:56:22 2011
New Revision: 1197951
URL: http://svn.apache.org/viewvc?rev=1197951&view=rev
Log:
CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.
Added:
camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
- copied from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
- copied unchanged from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
camel/branches/camel-2.8.x/tests/camel-itest/pom.xml
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 5 12:56:22 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov 5 12:56:22 2011
@@ -196,12 +196,24 @@ public abstract class GenericFileConsume
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov 5 12:56:22 2011
@@ -50,6 +50,7 @@ public abstract class ScheduledPollConsu
private boolean useFixedDelay = true;
private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
+ private volatile boolean polling;
public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -128,13 +129,20 @@ public abstract class ScheduledPollConsu
LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
}
- boolean begin = pollStrategy.begin(this, getEndpoint());
- if (begin) {
- retryCounter++;
- int polledMessages = poll();
- pollStrategy.commit(this, getEndpoint(), polledMessages);
- } else {
- LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+ // mark we are polling which should also include the begin/poll/commit
+ polling = true;
+ try {
+ boolean begin = pollStrategy.begin(this, getEndpoint());
+ if (begin) {
+ retryCounter++;
+ int polledMessages = poll();
+
+ pollStrategy.commit(this, getEndpoint(), polledMessages);
+ } else {
+ LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+ }
+ } finally {
+ polling = false;
}
}
@@ -171,6 +179,13 @@ public abstract class ScheduledPollConsu
return isRunAllowed() && !isSuspended();
}
+ /**
+ * Whether polling is currently in progress
+ */
+ protected boolean isPolling() {
+ return polling;
+ }
+
public long getInitialDelay() {
return initialDelay;
}
Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov 5 12:56:22 2011
@@ -190,12 +190,24 @@ public class S3Consumer extends Schedule
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov 5 12:56:22 2011
@@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov 5 12:56:22 2011
@@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov 5 12:56:22 2011
@@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov 5 12:56:22 2011
@@ -198,12 +198,24 @@ public class MailConsumer extends Schedu
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov 5 12:56:22 2011
@@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/branches/camel-2.8.x/tests/camel-itest/pom.xml
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/tests/camel-itest/pom.xml?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/tests/camel-itest/pom.xml (original)
+++ camel/branches/camel-2.8.x/tests/camel-itest/pom.xml Sat Nov 5 12:56:22 2011
@@ -100,6 +100,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-quartz</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rss</artifactId>
<scope>test</scope>
<!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec -->