You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/12/28 10:31:51 UTC
svn commit: r729712 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/test/java/org/apache/camel/component/file/
camel-core/src/test/resources/
components/camel-ftp/src/main/java/org/apache/camel/compo...
Author: davsclaus
Date: Sun Dec 28 01:31:50 2008
New Revision: 729712
URL: http://svn.apache.org/viewvc?rev=729712&view=rev
Log:
CAMEL-1153: Polished code. Aligned file consumer with ftp consumers, so they are very similar
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java (contents, props changed)
- copied, changed from r729464, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Sun Dec 28 01:31:50 2008
@@ -48,7 +48,13 @@
protected void poll() throws Exception {
// gather list of files to process
List<File> files = new ArrayList<File>();
- scanFilesToPoll(endpoint.getFile(), true, files);
+
+ boolean isDirectory = endpoint.getFile().isDirectory();
+ if (isDirectory) {
+ pollDirectory(endpoint.getFile(), isRecursive(), files);
+ } else {
+ pollFile(endpoint.getFile(), files);
+ }
// sort files using file comparator if provided
if (endpoint.getSorter() != null) {
@@ -83,42 +89,59 @@
}
/**
- * Scans the given file or directory for files to process.
+ * Polls the given directory for files to process
*
- * @param fileOrDirectory current file or directory when doing recursion
- * @param processDir recursive
- * @param fileList current list of files gathered
+ * @param fileOrDirectory current directory or file
+ * @param processDir recursive
+ * @param fileList current list of files gathered
*/
- protected void scanFilesToPoll(File fileOrDirectory, boolean processDir, List<File> fileList) {
+ protected void pollDirectory(File fileOrDirectory, boolean processDir, List<File> fileList) {
if (fileOrDirectory == null || !fileOrDirectory.exists()) {
- // not a file so skip it
return;
}
- if (!fileOrDirectory.isDirectory()) {
- addFile(fileOrDirectory, fileList);
- // must test matching for directories as well as we want to skip directories starting with a dot etc.
- } else if (processDir && matchFile(fileOrDirectory)) {
- // directory that can be recursive
- if (LOG.isTraceEnabled()) {
- LOG.trace("Polling directory " + fileOrDirectory);
- }
- File[] files = fileOrDirectory.listFiles();
- for (File file : files) {
- // recursive scan and add the files
- scanFilesToPoll(file, isRecursive(), fileList);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping directory " + fileOrDirectory);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling directory: " + fileOrDirectory.getPath());
+ }
+ File[] files = fileOrDirectory.listFiles();
+ for (File file : files) {
+ if (processDir && file.isDirectory()) {
+ if (isValidFile(file)) {
+ // recursive scan and add the sub files and folders
+ pollDirectory(file, isRecursive(), fileList);
+ }
+ } else if (file.isFile()) {
+ if (isValidFile(file)) {
+ // matched file so add
+ fileList.add(file);
+ }
+ } else {
+ LOG.debug("Ignoring unsupported file type " + file);
}
}
}
/**
+ * Polls the given file
+ *
+ * @param file the file
+ * @param fileList current list of files gathered
+ */
+ protected void pollFile(File file, List<File> fileList) {
+ if (file == null || !file.exists()) {
+ return;
+ }
+
+ if (isValidFile(file)) {
+ // matched file so add
+ fileList.add(file);
+ }
+ }
+
+ /**
* Processes the given file
*
- * @param exchange the file exchange
+ * @param exchange the file exchange
*/
protected void processExchange(final FileExchange exchange) {
final File target = exchange.getFile();
@@ -177,12 +200,12 @@
/**
* Strategy when the file was processed and a commit should be executed.
*
- * @param processStrategy the strategy to perform the commit
- * @param exchange the exchange
- * @param file the file processed
- * @param failureHandled is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
- * an exception occured during processing but it was handled by the failure processor (usually the
- * DeadLetterChannel).
+ * @param processStrategy the strategy to perform the commit
+ * @param exchange the exchange
+ * @param file the file processed
+ * @param failureHandled is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
+ * an exception occured during processing but it was handled by the failure processor (usually the
+ * DeadLetterChannel).
*/
protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
File file, boolean failureHandled) {
@@ -191,7 +214,7 @@
// use file.getPath as key for the idempotent repository to support files with same name but in different folders
endpoint.getIdempotentRepository().add(file.getPath());
}
-
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Committing file strategy: " + processStrategy + " for file: "
@@ -207,9 +230,9 @@
/**
* Strategy when the file was not processed and a rollback should be executed.
*
- * @param processStrategy the strategy to perform the commit
- * @param exchange the exchange
- * @param file the file processed
+ * @param processStrategy the strategy to perform the commit
+ * @param exchange the exchange
+ * @param file the file processed
*/
protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
if (LOG.isDebugEnabled()) {
@@ -220,11 +243,12 @@
/**
* Strategy for validating if the given file should be included or not
- * @param file the file
+ *
+ * @param file the file
* @return true to include the file, false to skip it
*/
- protected boolean validateFile(File file) {
- if (!matchFile(file)) {
+ protected boolean isValidFile(File file) {
+ if (!isMatched(file)) {
if (LOG.isTraceEnabled()) {
LOG.trace("File did not match. Will skip this file: " + file);
}
@@ -244,15 +268,15 @@
* <p/>
* Will always return <tt>false</tt> for certain files/folders:
* <ul>
- * <li>Starting with a dot</li>
- * <li>lock files</li>
+ * <li>Starting with a dot</li>
+ * <li>lock files</li>
* </ul>
* And then <tt>true</tt> for directories.
*
- * @param file the file
- * @return true if the file is matche, false if not
+ * @param file the file
+ * @return true if the file is matched, false if not
*/
- protected boolean matchFile(File file) {
+ protected boolean isMatched(File file) {
String name = file.getName();
// folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
@@ -295,12 +319,6 @@
return true;
}
- private void addFile(File file, List<File> fileList) {
- if (validateFile(file)) {
- fileList.add(file);
- }
- }
-
public boolean isRecursive() {
return this.recursive;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java Sun Dec 28 01:31:50 2008
@@ -183,7 +183,7 @@
}
public boolean isAutoCreate() {
- return this.autoCreate;
+ return autoCreate;
}
public void setAutoCreate(boolean autoCreate) {
@@ -193,7 +193,9 @@
public FileProcessStrategy getFileStrategy() {
if (fileProcessStrategy == null) {
fileProcessStrategy = createFileStrategy();
- LOG.debug("Using file process strategy: " + fileProcessStrategy);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using file process strategy: " + fileProcessStrategy);
+ }
}
return fileProcessStrategy;
}
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java (from r729464, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java&r1=729464&r2=729712&rev=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java Sun Dec 28 01:31:50 2008
@@ -16,37 +16,61 @@
*/
package org.apache.camel.component.file;
+import java.io.File;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for consuming the single directory only.
+ * Unit test for consuming multiple directories.
*/
-public class FileConsumeSingleDirectoryOnlyTest extends ContextTestSupport {
+public class FileConsumeMultipleDirectoriesTest extends ContextTestSupport {
+
+ private String fileUrl = "file://target/multidir/?consumer.recursive=true&delete=true&consumer.delay=5000&sortBy=file:path";
@Override
protected void setUp() throws Exception {
super.setUp();
- deleteDirectory("target/singledirectoryonly");
- template.sendBodyAndHeader("file://target/singledirectoryonly", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
- template.sendBodyAndHeader("file://target/singledirectoryonly", "Bye World", FileComponent.HEADER_FILE_NAME, "report2.txt");
- template.sendBodyAndHeader("file://target/singledirectoryonly/2008", "2008 Report", FileComponent.HEADER_FILE_NAME, "report2008.txt");
+ deleteDirectory("target/multidir");
+ template.sendBodyAndHeader(fileUrl, "Bye World", FileComponent.HEADER_FILE_NAME, "bye.txt");
+ template.sendBodyAndHeader(fileUrl, "Hello World", FileComponent.HEADER_FILE_NAME, "sub/hello.txt");
+ template.sendBodyAndHeader(fileUrl, "Godday World", FileComponent.HEADER_FILE_NAME, "sub/sub2/godday.txt");
}
- public void testConsumeFileOnly() throws Exception {
+ public void testMultiDir() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+ mock.expectedBodiesReceived("Bye World", "Hello World", "Godday World");
assertMockEndpointsSatisfied();
+
+ FileExchange exchange = (FileExchange) mock.getExchanges().get(0);
+ File file = exchange.getFile();
+ assertFilePath("target/multidir/bye.txt", file.getPath());
+ assertEquals("bye.txt", file.getName());
+
+ exchange = (FileExchange) mock.getExchanges().get(1);
+ file = exchange.getFile();
+ assertFilePath("target/multidir/sub/hello.txt", file.getPath());
+ assertEquals("hello.txt", file.getName());
+
+ exchange = (FileExchange) mock.getExchanges().get(2);
+ file = exchange.getFile();
+ assertFilePath("target/multidir/sub/sub2/godday.txt", file.getPath());
+ assertEquals("godday.txt", file.getName());
+ }
+
+ private static void assertFilePath(String expected, String actual) {
+ actual = actual.replaceAll("\\\\", "/");
+ assertEquals(expected, actual);
}
- @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("file://target/singledirectoryonly/?consumer.recursive=false&delete=true").to("mock:result");
+ from(fileUrl).to("mock:result");
}
};
}
-}
+
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java Sun Dec 28 01:31:50 2008
@@ -36,7 +36,9 @@
public void testManually() throws Exception {
deleteDirectory("./target/exclusiveread");
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.setSleepForEmptyTest(10000);
+ // this is used for manual testing where you can copy/lock files etc. while this test runs
+ //mock.setSleepForEmptyTest(10 * 1000L);
+ mock.setSleepForEmptyTest(100);
mock.expectedMessageCount(0);
mock.assertIsSatisfied();
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java Sun Dec 28 01:31:50 2008
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.FileOutputStream;
-import java.nio.channels.FileLock;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -51,7 +50,6 @@
deleteDirectory("./target/exclusiveread");
createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
mock.expectedMessageCount(1);
// send a message to seda:start to trigger the creating of the slowfile to poll
@@ -67,7 +65,7 @@
private class MySlowFileProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
- LOG.info("Creating a slow fil with no locks...");
+ LOG.info("Creating a slow file with no locks...");
File file = new File("./target/exclusiveread/slowfile/hello.txt");
FileOutputStream fos = new FileOutputStream(file);
fos.write("Hello World".getBytes());
Modified: activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ activemq/camel/trunk/camel-core/src/test/resources/log4j.properties Sun Dec 28 01:31:50 2008
@@ -18,7 +18,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.apache.camel=DEBUG
@@ -26,13 +26,13 @@
log4j.logger.org.apache.camel.impl.converter=WARN
# CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/camel-core-test.log
-log4j.appender.out.append=true
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-core-test.log
+log4j.appender.file.append=true
Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Sun Dec 28 01:31:50 2008
@@ -45,15 +45,19 @@
List<FTPFile> files = operations.listFiles(fileName);
for (FTPFile file : files) {
RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
- if (processDir && file.isDirectory() && isValidFile(remote, true)) {
- // recursive scan and add the sub files and folders
- String directory = fileName + "/" + file.getName();
- pollDirectory(directory, endpoint.isRecursive(), fileList);
- } else if (file.isFile() && isValidFile(remote, false)) {
- // matched file so add
- fileList.add(remote);
+ if (processDir && file.isDirectory()) {
+ if (isValidFile(remote, true)) {
+ // recursive scan and add the sub files and folders
+ String directory = fileName + "/" + file.getName();
+ pollDirectory(directory, endpoint.isRecursive(), fileList);
+ }
+ } else if (file.isFile()) {
+ if (isValidFile(remote, false)) {
+ // matched file so add
+ fileList.add(remote);
+ }
} else {
- log.debug("Ignoring unsupported file type " + file);
+ log.debug("Ignoring unsupported remote file type: " + file);
}
}
}
@@ -69,7 +73,10 @@
FTPFile file = list.get(0);
if (file != null) {
RemoteFile remoteFile = asRemoteFile(directory, file);
- fileList.add(remoteFile);
+ if (isValidFile(remoteFile, false)) {
+ // matched file so add
+ fileList.add(remoteFile);
+ }
}
}
Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Sun Dec 28 01:31:50 2008
@@ -48,8 +48,59 @@
this.operations = operations;
}
+ protected void poll() throws Exception {
+ connectIfNecessary();
+ if (!loggedIn) {
+ String message = "Could not connect/login to: " + endpoint.remoteServerInformation() + ". Will skip this poll.";
+ log.warn(message);
+ return;
+ }
+
+ // gather list of files to process
+ List<RemoteFile> files = new ArrayList<RemoteFile>();
+
+ String name = endpoint.getConfiguration().getFile();
+ boolean isDirectory = endpoint.getConfiguration().isDirectory();
+ if (isDirectory) {
+ pollDirectory(name, endpoint.isRecursive(), files);
+ } else {
+ pollFile(name, files);
+ }
+
+ // sort files using file comparator if provided
+ if (endpoint.getSorter() != null) {
+ Collections.sort(files, endpoint.getSorter());
+ }
+
+ // sort using build in sorters that is expression based
+ // first we need to convert to RemoteFileExchange objects so we can sort using expressions
+ List<RemoteFileExchange> exchanges = new ArrayList<RemoteFileExchange>(files.size());
+ for (RemoteFile file : files) {
+ RemoteFileExchange exchange = endpoint.createExchange(file);
+ endpoint.configureMessage(file, exchange.getIn());
+ exchanges.add(exchange);
+ }
+ // sort files using exchange comparator if provided
+ if (endpoint.getSortBy() != null) {
+ Collections.sort(exchanges, endpoint.getSortBy());
+ }
+
+ // consume files one by one
+ int total = exchanges.size();
+ if (total > 0 && log.isDebugEnabled()) {
+ log.debug("Total " + total + " files to consume");
+ }
+ for (int index = 0; index < total; index++) {
+ RemoteFileExchange exchange = exchanges.get(index);
+ // add current index and total as headers
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
+ processExchange(exchange);
+ }
+ }
+
/**
- * Polls the given directory for files to process.
+ * Polls the given directory for files to process
*
* @param fileName current directory or file
* @param processDir recursive
@@ -183,12 +234,12 @@
*
* @param file the remote file
* @param isDirectory wether the file is a directory or a file
- * @return <tttrue</tt> to include the file, <tt>false</tt> to skip it
+ * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
protected boolean isValidFile(RemoteFile file, boolean isDirectory) {
if (!isMatched(file, isDirectory)) {
if (log.isTraceEnabled()) {
- log.trace("File did not match. Will skip this file: " + file);
+ log.trace("Remote file did not match. Will skip this remote file: " + file);
}
return false;
}
@@ -198,11 +249,18 @@
}
/**
- * Is the given file matched to be consumed.
+ * Strategy to perform file matching based on endpoint configuration.
+ * <p/>
+ * Will always return <tt>false</tt> for certain files/folders:
+ * <ul>
+ * <li>Starting with a dot</li>
+ * <li>lock files</li>
+ * </ul>
+ * And then <tt>true</tt> for directories.
*
* @param file the remote file
- * @param isDirectory is the given file a directory or a file
- * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
+ * @param isDirectory wether the file is a directory or a file
+ * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
*/
protected boolean isMatched(RemoteFile file, boolean isDirectory) {
String name = file.getFileName();
@@ -285,55 +343,4 @@
protected String remoteServer() {
return endpoint.remoteServerInformation();
}
-
- protected void poll() throws Exception {
- connectIfNecessary();
- if (!loggedIn) {
- String message = "Could not connect/login to: " + endpoint.remoteServerInformation() + ". Will skip this poll.";
- log.warn(message);
- return;
- }
-
- // gather list of files to process
- List<RemoteFile> files = new ArrayList<RemoteFile>();
-
- String name = endpoint.getConfiguration().getFile();
- boolean isDirectory = endpoint.getConfiguration().isDirectory();
- if (isDirectory) {
- pollDirectory(name, true, files);
- } else {
- pollFile(name, files);
- }
-
- // sort files using file comparator if provided
- if (endpoint.getSorter() != null) {
- Collections.sort(files, endpoint.getSorter());
- }
-
- // sort using build in sorters that is expression based
- // first we need to convert to RemoteFileExchange objects so we can sort using expressions
- List<RemoteFileExchange> exchanges = new ArrayList<RemoteFileExchange>(files.size());
- for (RemoteFile file : files) {
- RemoteFileExchange exchange = endpoint.createExchange(file);
- endpoint.configureMessage(file, exchange.getIn());
- exchanges.add(exchange);
- }
- // sort files using exchange comparator if provided
- if (endpoint.getSortBy() != null) {
- Collections.sort(exchanges, endpoint.getSortBy());
- }
-
- // consume files one by one
- int total = exchanges.size();
- if (total > 0 && log.isDebugEnabled()) {
- log.debug("Total " + total + " files to consume");
- }
- for (int index = 0; index < total; index++) {
- RemoteFileExchange exchange = exchanges.get(index);
- // add current index and total as headers
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
- processExchange(exchange);
- }
- }
}
Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Sun Dec 28 01:31:50 2008
@@ -35,27 +35,30 @@
return;
}
- String currentDir = operations.getCurrentDirectory();
- operations.changeCurrentDirectory(fileName);
+ if (fileName.endsWith("/")) {
+ fileName = fileName.substring(0, fileName.length() - 1);
+ }
if (log.isTraceEnabled()) {
log.trace("Polling directory: " + fileName);
}
- List<ChannelSftp.LsEntry> files = operations.listFiles();
+ List<ChannelSftp.LsEntry> files = operations.listFiles(fileName);
for (ChannelSftp.LsEntry file : files) {
- RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(file);
- if (processDir && file.getAttrs().isDir() && isValidFile(remote, true)) {
- // recursive scan and add the sub files and folders
- pollDirectory(file.getFilename(), endpoint.isRecursive(), fileList);
- } else if (!file.getAttrs().isLink() && isValidFile(remote, false)) {
- // matched file so add
- fileList.add(remote);
+ RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file);
+ if (processDir && file.getAttrs().isDir()) {
+ if (isValidFile(remote, true)) {
+ // recursive scan and add the sub files and folders
+ pollDirectory(file.getFilename(), endpoint.isRecursive(), fileList);
+ }
+ } else if (!file.getAttrs().isLink()) {
+ if (isValidFile(remote, false)) {
+ // matched file so add
+ fileList.add(remote);
+ }
} else {
- log.debug("Ignoring unsupported file type " + file);
+ log.debug("Ignoring unsupported remote file type: " + file);
}
}
-
- operations.changeCurrentDirectory(currentDir);
}
/**
@@ -65,33 +68,36 @@
* @param fileList current list of files gathered
*/
protected void pollFile(String fileName, List<RemoteFile> fileList) {
+ String directory = ".";
int index = fileName.lastIndexOf("/");
if (index > -1) {
- // cd to the folder of the filename
- operations.changeCurrentDirectory(fileName.substring(0, index));
+ directory = fileName.substring(0, index);
}
// list the files in the fold and poll the first file
- List<ChannelSftp.LsEntry> list = operations.listFiles(fileName.substring(index + 1));
+ List<ChannelSftp.LsEntry> list = operations.listFiles(fileName);
ChannelSftp.LsEntry file = list.get(0);
if (file != null) {
- RemoteFile remoteFile = asRemoteFile(file);
- fileList.add(remoteFile);
+ RemoteFile remoteFile = asRemoteFile(directory, file);
+ if (isValidFile(remoteFile, false)) {
+ // matched file so add
+ fileList.add(remoteFile);
+ }
}
}
- private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(ChannelSftp.LsEntry file) {
+ private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String directory, ChannelSftp.LsEntry file) {
RemoteFile<ChannelSftp.LsEntry> remote = new RemoteFile<ChannelSftp.LsEntry>();
remote.setFile(file);
remote.setFileName(file.getFilename());
remote.setFileLength(file.getAttrs().getSize());
remote.setLastModified(file.getAttrs().getMTime() * 1000L);
remote.setHostname(endpoint.getConfiguration().getHost());
- String absoluteFileName = getAbsoluteFileName(file);
+ String absoluteFileName = directory + "/" + file.getFilename();
remote.setAbsolutelFileName(absoluteFileName);
// the relative filename
String ftpBasePath = endpoint.getConfiguration().getFile();
- String relativePath = absoluteFileName.substring(ftpBasePath.length() + 1);
+ String relativePath = absoluteFileName.substring(ftpBasePath.length());
if (relativePath.startsWith("/")) {
relativePath = relativePath.substring(1);
}
@@ -100,8 +106,4 @@
return remote;
}
- private String getAbsoluteFileName(ChannelSftp.LsEntry ftpFile) {
- return operations.getCurrentDirectory() + "/" + ftpFile.getFilename();
- }
-
}