You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/05 13:56:22 UTC

svn commit: r1197951 - in /camel/branches/camel-2.8.x: ./ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/impl/ components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/ components/camel-a...

Author: davsclaus
Date: Sat Nov  5 12:56:22 2011
New Revision: 1197951

URL: http://svn.apache.org/viewvc?rev=1197951&view=rev
Log:
CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.

Added:
    camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
      - copied from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
    camel/branches/camel-2.8.x/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
      - copied unchanged from r1197948, camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
    camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
    camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
    camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
    camel/branches/camel-2.8.x/tests/camel-itest/pom.xml

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov  5 12:56:22 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov  5 12:56:22 2011
@@ -196,12 +196,24 @@ public abstract class GenericFileConsume
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/branches/camel-2.8.x/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov  5 12:56:22 2011
@@ -50,6 +50,7 @@ public abstract class ScheduledPollConsu
     private boolean useFixedDelay = true;
     private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
     private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
+    private volatile boolean polling;
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -128,13 +129,20 @@ public abstract class ScheduledPollConsu
                         LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
                     }
 
-                    boolean begin = pollStrategy.begin(this, getEndpoint());
-                    if (begin) {
-                        retryCounter++;
-                        int polledMessages = poll();
-                        pollStrategy.commit(this, getEndpoint(), polledMessages);
-                    } else {
-                        LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+                    // mark we are polling which should also include the begin/poll/commit
+                    polling = true;
+                    try {
+                        boolean begin = pollStrategy.begin(this, getEndpoint());
+                        if (begin) {
+                            retryCounter++;
+                            int polledMessages = poll();
+
+                            pollStrategy.commit(this, getEndpoint(), polledMessages);
+                        } else {
+                            LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+                        }
+                    } finally {
+                        polling = false;
                     }
                 }
 
@@ -171,6 +179,13 @@ public abstract class ScheduledPollConsu
         return isRunAllowed() && !isSuspended();
     }
 
+    /**
+     * Whether polling is currently in progress
+     */
+    protected boolean isPolling() {
+        return polling;
+    }
+
     public long getInitialDelay() {
         return initialDelay;
     }

Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov  5 12:56:22 2011
@@ -190,12 +190,24 @@ public class S3Consumer extends Schedule
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov  5 12:56:22 2011
@@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov  5 12:56:22 2011
@@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov  5 12:56:22 2011
@@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov  5 12:56:22 2011
@@ -198,12 +198,24 @@ public class MailConsumer extends Schedu
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov  5 12:56:22 2011
@@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch
     }
 
     public int getPendingExchangesSize() {
+        int answer;
         // only return the real pending size in case we are configured to complete all tasks
         if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
-            return pendingExchanges;
+            answer = pendingExchanges;
         } else {
-            return 0;
+            answer = 0;
         }
+
+        if (answer == 0 && isPolling()) {
+            // force at least one pending exchange if we are polling as there is a little gap
+            // in the processBatch method and until an exchange gets enlisted as in-flight
+            // which happens later, so we need to signal back to the shutdown strategy that
+            // there is a pending exchange. When we are no longer polling, then we will return 0
+            log.trace("Currently polling so returning 1 as pending exchanges");
+            answer = 1;
+        }
+
+        return answer;
     }
 
     public void prepareShutdown() {

Modified: camel/branches/camel-2.8.x/tests/camel-itest/pom.xml
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/tests/camel-itest/pom.xml?rev=1197951&r1=1197950&r2=1197951&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/tests/camel-itest/pom.xml (original)
+++ camel/branches/camel-2.8.x/tests/camel-itest/pom.xml Sat Nov  5 12:56:22 2011
@@ -100,6 +100,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-quartz</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-rss</artifactId>
             <scope>test</scope>
             <!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec -->