You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/08 10:48:40 UTC
svn commit: r782560 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/component/file/strategy/
camel-core/src/test/java/org/apache/camel/component/file/stress/
components/camel-ftp/sr...
Author: davsclaus
Date: Mon Jun 8 08:48:39 2009
New Revision: 782560
URL: http://svn.apache.org/viewvc?rev=782560&view=rev
Log:
CAMEL-1670: fixed problem with ftp consumer not detecting in progress files.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java (contents, props changed)
- copied, changed from r782538, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java (contents, props changed)
- copied, changed from r782538, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java
Removed:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java Mon Jun 8 08:48:39 2009
@@ -26,12 +26,9 @@
* <p/>
* Camel supports out of the box the following strategies:
* <ul>
- * <li>GenericFileRenameExclusiveReadLockStrategy waiting until its possible to
- * rename the file.</li>
- * <li>NewFileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration
- * of the processing.</li>
- * <li>NewMarkerFileExclusiveReadLockStrategy using a marker file for acquiring
- * read lock.</li>
+ * <li>FileRenameExclusiveReadLockStrategy waiting until its possible to rename the file.</li>
+ * <li>FileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration of the processing.</li>
+ * <li>MarkerFileExclusiveReadLockStrategy using a marker file for acquiring read lock.</li>
* </ul>
*/
public interface GenericFileExclusiveReadLockStrategy<T> {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java Mon Jun 8 08:48:39 2009
@@ -111,8 +111,8 @@
return true;
}
- public void releaseExclusiveReadLock(GenericFileOperations<File> fileGenericFileOperations,
- GenericFile<File> fileGenericFile, Exchange exchange) throws Exception {
+ public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+ GenericFile<File> file, Exchange exchange) throws Exception {
FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class);
String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class);
Channel channel = lock.channel();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Mon Jun 8 08:48:39 2009
@@ -48,7 +48,7 @@
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
} else if (moveExpression != null || preMoveExpression != null) {
- GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
+ FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<File>();
@@ -63,7 +63,7 @@
return strategy;
} else {
// default strategy will move files in a .camel/ subfolder where the file was consumed
- GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<File>();
+ FileRenameProcessStrategy<File> strategy = new FileRenameProcessStrategy<File>();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
// use context to lookup language to let it be loose coupled
Language language = context.resolveLanguage("file");
Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java (from r782538, camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java&r1=782538&r2=782560&rev=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java Mon Jun 8 08:48:39 2009
@@ -24,11 +24,11 @@
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;
-public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
+public class FileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
private GenericFileRenamer<T> beginRenamer;
private GenericFileRenamer<T> commitRenamer;
- public GenericFileRenameProcessStrategy() {
+ public FileRenameProcessStrategy() {
}
@Override
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameProcessStrategy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategyFactory.java Mon Jun 8 08:48:39 2009
@@ -47,7 +47,7 @@
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
return strategy;
} else if (moveExpression != null || preMoveExpression != null) {
- GenericFileRenameProcessStrategy strategy = new GenericFileRenameProcessStrategy();
+ FileRenameProcessStrategy strategy = new FileRenameProcessStrategy();
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (moveExpression != null) {
GenericFileExpressionRenamer renamer = new GenericFileExpressionRenamer();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Mon Jun 8 08:48:39 2009
@@ -73,10 +73,10 @@
// delete local work file, if it was used (eg by ftp component)
File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class);
if (local != null && local.exists()) {
+ boolean deleted = local.delete();
if (log.isTraceEnabled()) {
- log.trace("Deleting lock work file: " + local);
+ log.trace("Local work file: " + local + " was deleted: " + deleted);
}
- local.delete();
}
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java Mon Jun 8 08:48:39 2009
@@ -79,7 +79,7 @@
return true;
}
- public void releaseExclusiveReadLock(GenericFileOperations<T> opeations, GenericFile<T> file,
+ public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
Exchange exchange) throws Exception {
// noop
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java Mon Jun 8 08:48:39 2009
@@ -59,8 +59,8 @@
}
}
- public void releaseExclusiveReadLock(GenericFileOperations<File> fileGenericFileOperations,
- GenericFile<File> fileGenericFile, Exchange exchange) throws Exception {
+ public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+ GenericFile<File> file, Exchange exchange) throws Exception {
FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class);
String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class);
Channel channel = lock.channel();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java Mon Jun 8 08:48:39 2009
@@ -52,7 +52,7 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // leverage the fact that we can limit to max 25 files per poll
+ // leverage the fact that we can limit to max 50 files per poll
// this will result in polling again and potentially picking up files
// that already are in progress
from("file:target/filestress?maxMessagesPerPoll=50")
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Jun 8 08:48:39 2009
@@ -59,8 +59,14 @@
} else if (file.isFile()) {
RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
if (isValidFile(remote, false)) {
- // matched file so add
- fileList.add(remote);
+ if (isInProgress(remote)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Skipping as file is already in progress: " + remote.getFileName());
+ }
+ } else {
+ // matched file so add
+ fileList.add(remote);
+ }
}
} else {
log.debug("Ignoring unsupported remote file type: " + file);
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Jun 8 08:48:39 2009
@@ -61,8 +61,14 @@
} else {
RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file);
if (isValidFile(remote, false)) {
- // matched file so add
- fileList.add(remote);
+ if (isInProgress(remote)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Skipping as file is already in progress: " + remote.getFileName());
+ }
+ } else {
+ // matched file so add
+ fileList.add(remote);
+ }
}
}
}
Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java (from r782538, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java&r1=782538&r2=782560&rev=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java Mon Jun 8 08:48:39 2009
@@ -16,59 +16,58 @@
*/
package org.apache.camel.component.file.remote;
-import org.apache.camel.Endpoint;
+import java.util.Random;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
-public class FtpConsumerBodyAsStringTest extends FtpServerTestSupport {
+public class FtpConsumerAsyncStressTest extends FtpServerTestSupport {
+
+ private int files = 100;
private String getFtpUrl() {
- return "ftp://admin@localhost:" + getPort() + "/tmp4/camel?password=admin&consumer.delay=5000";
+ return "ftp://admin@localhost:" + getPort() + "/filestress/?password=admin&maxMessagesPerPoll=25";
}
@Override
protected void setUp() throws Exception {
+ deleteDirectory(FTP_ROOT_DIR + "filestress");
super.setUp();
- prepareFtpServer();
+ for (int i = 0; i < files; i++) {
+ template.sendBodyAndHeader("file://" + FTP_ROOT_DIR + "filestress", "Hello World", Exchange.FILE_NAME, i + ".txt");
+ }
}
- private void prepareFtpServer() throws Exception {
- // prepares the FTP Server by creating a file on the server that we want to unit
- // test that we can pool
- Endpoint endpoint = context.getEndpoint(getFtpUrl());
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader(Exchange.FILE_NAME, "hello.txt");
- Producer producer = endpoint.createProducer();
- producer.start();
- producer.process(exchange);
- producer.stop();
- }
-
- public void testSingleFileTest() throws Exception {
+ public void testFTPConsumerAsyncStress() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedMessageCount(1);
+ mock.expectedMinimumMessageCount(50);
assertMockEndpointsSatisfied();
}
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
+ @Override
public void configure() throws Exception {
- from(getFtpUrl()).process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- String body = exchange.getIn().getBody(String.class);
- assertNotNull(body);
- assertEquals("Hello World", body);
- }
- }).to("mock:result");
+ // leverage the fact that we can limit to max 25 files per poll
+ // this will result in polling again and potentially picking up files
+ // that already are in progress
+ from(getFtpUrl())
+ .threads(10)
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // simulate some work with random time to complete
+ Random ran = new Random();
+ int delay = ran.nextInt(500) + 10;
+ Thread.sleep(delay);
+ }
+ }).to("mock:result");
}
};
}
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerAsyncStressTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=782560&r1=782559&r2=782560&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java Mon Jun 8 08:48:39 2009
@@ -93,6 +93,10 @@
public boolean remove(String key) {
return true;
}
+
+ public boolean confirm(String key) {
+ return true;
+ }
}
}
\ No newline at end of file