You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/05 13:46:14 UTC
svn commit: r1197948 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/impl/
components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/
components/camel-aws/src/main/java/o...
Author: davsclaus
Date: Sat Nov 5 12:46:13 2011
New Revision: 1197948
URL: http://svn.apache.org/viewvc?rev=1197948&view=rev
Log:
CAMEL-4605: Fixed issue when stopping Batch Consumer routes and have configured route with CompleteAllTask, there may be a slim chance the route will be shutdown too early before the last batch message had a chance to be enlisted as in flight exchange.
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
camel/trunk/tests/camel-itest/pom.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Nov 5 12:46:13 2011
@@ -195,12 +195,24 @@ public abstract class GenericFileConsume
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sat Nov 5 12:46:13 2011
@@ -52,6 +52,7 @@ public abstract class ScheduledPollConsu
private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
private boolean sendEmptyMessageWhenIdle;
+ private volatile boolean polling;
public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -130,19 +131,25 @@ public abstract class ScheduledPollConsu
LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
}
- boolean begin = pollStrategy.begin(this, getEndpoint());
- if (begin) {
- retryCounter++;
- int polledMessages = poll();
-
- if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
- // send an "empty" exchange
- processEmptyMessage();
+ // mark we are polling which should also include the begin/poll/commit
+ polling = true;
+ try {
+ boolean begin = pollStrategy.begin(this, getEndpoint());
+ if (begin) {
+ retryCounter++;
+ int polledMessages = poll();
+
+ if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
+ // send an "empty" exchange
+ processEmptyMessage();
+ }
+
+ pollStrategy.commit(this, getEndpoint(), polledMessages);
+ } else {
+ LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
}
-
- pollStrategy.commit(this, getEndpoint(), polledMessages);
- } else {
- LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+ } finally {
+ polling = false;
}
}
@@ -190,6 +197,13 @@ public abstract class ScheduledPollConsu
return isRunAllowed() && !isSuspended();
}
+ /**
+ * Whether polling is currently in progress
+ */
+ protected boolean isPolling() {
+ return polling;
+ }
+
public long getInitialDelay() {
return initialDelay;
}
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java Sat Nov 5 12:46:13 2011
@@ -190,12 +190,24 @@ public class S3Consumer extends Schedule
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java Sat Nov 5 12:46:13 2011
@@ -190,12 +190,24 @@ public class SqsConsumer extends Schedul
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java (original)
+++ camel/trunk/components/camel-ibatis/src/main/java/org/apache/camel/component/ibatis/IBatisConsumer.java Sat Nov 5 12:46:13 2011
@@ -169,12 +169,24 @@ public class IBatisConsumer extends Sche
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original)
+++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sat Nov 5 12:46:13 2011
@@ -141,12 +141,24 @@ public class JcloudsBlobStoreConsumer ex
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
@Override
Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat Nov 5 12:46:13 2011
@@ -158,12 +158,24 @@ public class JpaConsumer extends Schedul
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Sat Nov 5 12:46:13 2011
@@ -151,12 +151,24 @@ public class KratiConsumer extends Sched
@Override
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
@Override
Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat Nov 5 12:46:13 2011
@@ -211,12 +211,24 @@ public class MailConsumer extends Schedu
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (original)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Sat Nov 5 12:46:13 2011
@@ -169,12 +169,24 @@ public class MyBatisConsumer extends Sch
}
public int getPendingExchangesSize() {
+ int answer;
// only return the real pending size in case we are configured to complete all tasks
if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
- return pendingExchanges;
+ answer = pendingExchanges;
} else {
- return 0;
+ answer = 0;
}
+
+ if (answer == 0 && isPolling()) {
+ // force at least one pending exchange if we are polling as there is a little gap
+ // in the processBatch method and until an exchange gets enlisted as in-flight
+ // which happens later, so we need to signal back to the shutdown strategy that
+ // there is a pending exchange. When we are no longer polling, then we will return 0
+ log.trace("Currently polling so returning 1 as pending exchanges");
+ answer = 1;
+ }
+
+ return answer;
}
public void prepareShutdown() {
Modified: camel/trunk/tests/camel-itest/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/pom.xml?rev=1197948&r1=1197947&r2=1197948&view=diff
==============================================================================
--- camel/trunk/tests/camel-itest/pom.xml (original)
+++ camel/trunk/tests/camel-itest/pom.xml Sat Nov 5 12:46:13 2011
@@ -100,6 +100,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-quartz</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rss</artifactId>
<scope>test</scope>
<!-- conflicts with mockmail for unit testing, so we exclude this geronimo spec -->
Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java?rev=1197948&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/quartz/FtpCronScheduledRoutePolicyTest.java Sat Nov 5 12:46:13 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.quartz;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolProfileBuilder;
+import org.apache.camel.routepolicy.quartz.CronScheduledRoutePolicy;
+import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.filesystem.nativefs.NativeFileSystemFactory;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor;
+import org.apache.ftpserver.usermanager.impl.PropertiesUserManager;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ *
+ */
+@Ignore("Manual test")
+public class FtpCronScheduledRoutePolicyTest extends CamelTestSupport {
+
+ protected FtpServer ftpServer;
+ private String ftp = "ftp:localhost:20128/myapp?password=admin&username=admin&delay=5s&idempotent=false&localWorkDirectory=target/tmp";
+
+ @Test
+ public void testFtpCronScheduledRoutePolicyTest() throws Exception {
+ template.sendBodyAndHeader("file:res/home/myapp", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ Thread.sleep(10 * 1000 * 60);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ CronScheduledRoutePolicy policy = new CronScheduledRoutePolicy();
+ policy.setRouteStartTime("* 0/2 * * * ?");
+ policy.setRouteStopTime("* 1/2 * * * ?");
+ policy.setRouteStopGracePeriod(250);
+ policy.setTimeUnit(TimeUnit.SECONDS);
+
+ ThreadPoolProfile profile = new ThreadPoolProfileBuilder("foo")
+ .poolSize(2).maxPoolSize(2).maxPoolSize(-1).build();
+
+ context.getExecutorServiceManager().registerThreadPoolProfile(profile);
+
+ from(ftp)
+ .noAutoStartup().routePolicy(policy).shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
+ .log("Processing ${file:name}")
+ .to("log:done");
+ }
+ };
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ deleteDirectory("res");
+ createDirectory("res/home/myapp");
+ initFtpServer();
+ ftpServer.start();
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ ftpServer.stop();
+ ftpServer = null;
+ }
+
+ protected void initFtpServer() throws Exception {
+ FtpServerFactory serverFactory = new FtpServerFactory();
+
+ // setup user management to read our users.properties and use clear text passwords
+ File file = new File("./src/test/resources/users.properties").getAbsoluteFile();
+ UserManager uman = new PropertiesUserManager(new ClearTextPasswordEncryptor(), file, "admin");
+ serverFactory.setUserManager(uman);
+
+ NativeFileSystemFactory fsf = new NativeFileSystemFactory();
+ fsf.setCreateHome(true);
+ serverFactory.setFileSystem(fsf);
+
+ ListenerFactory factory = new ListenerFactory();
+ factory.setPort(20128);
+ serverFactory.addListener("default", factory.createListener());
+
+ ftpServer = serverFactory.createServer();
+ }
+
+}