You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2008/12/28 10:31:51 UTC

svn commit: r729712 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/ camel-core/src/test/resources/ components/camel-ftp/src/main/java/org/apache/camel/compo...

Author: davsclaus
Date: Sun Dec 28 01:31:50 2008
New Revision: 729712

URL: http://svn.apache.org/viewvc?rev=729712&view=rev
Log:
CAMEL-1153: Polished code. Aligned file consumer with ftp consumers, so they are very similar

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java   (contents, props changed)
      - copied, changed from r729464, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
    activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Sun Dec 28 01:31:50 2008
@@ -48,7 +48,13 @@
     protected void poll() throws Exception {
         // gather list of files to process
         List<File> files = new ArrayList<File>();
-        scanFilesToPoll(endpoint.getFile(), true, files);
+
+        boolean isDirectory = endpoint.getFile().isDirectory();
+        if (isDirectory) {
+            pollDirectory(endpoint.getFile(), isRecursive(), files);
+        } else {
+            pollFile(endpoint.getFile(), files);
+        }
 
         // sort files using file comparator if provided
         if (endpoint.getSorter() != null) {
@@ -83,42 +89,59 @@
     }
 
     /**
-     * Scans the given file or directory for files to process.
+     * Polls the given directory for files to process
      *
-     * @param fileOrDirectory  current file or directory when doing recursion
-     * @param processDir  recursive
-     * @param fileList  current list of files gathered
+     * @param fileOrDirectory current directory or file
+     * @param processDir      recursive
+     * @param fileList        current list of files gathered
      */
-    protected void scanFilesToPoll(File fileOrDirectory, boolean processDir, List<File> fileList) {
+    protected void pollDirectory(File fileOrDirectory, boolean processDir, List<File> fileList) {
         if (fileOrDirectory == null || !fileOrDirectory.exists()) {
-            // not a file so skip it
             return;
         }
 
-        if (!fileOrDirectory.isDirectory()) {
-            addFile(fileOrDirectory, fileList);
-        // must test matching for directories as well as we want to skip directories starting with a dot etc.
-        } else if (processDir && matchFile(fileOrDirectory)) {
-            // directory that can be recursive
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Polling directory " + fileOrDirectory);
-            }
-            File[] files = fileOrDirectory.listFiles();
-            for (File file : files) {
-                // recursive scan and add the files
-                scanFilesToPoll(file, isRecursive(), fileList);
-            }
-        } else {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Skipping directory " + fileOrDirectory);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling directory: " + fileOrDirectory.getPath());
+        }
+        File[] files = fileOrDirectory.listFiles();
+        for (File file : files) {
+            if (processDir && file.isDirectory()) {
+                if (isValidFile(file)) {
+                    // recursive scan and add the sub files and folders
+                    pollDirectory(file, isRecursive(), fileList);
+                }
+            } else if (file.isFile()) {
+                if (isValidFile(file)) {
+                    // matched file so add
+                    fileList.add(file);
+                }
+            } else {
+                LOG.debug("Ignoring unsupported file type " + file);
             }
         }
     }
 
     /**
+     * Polls the given file
+     *
+     * @param file     the file
+     * @param fileList current list of files gathered
+     */
+    protected void pollFile(File file, List<File> fileList) {
+        if (file == null || !file.exists()) {
+            return;
+        }
+
+        if (isValidFile(file)) {
+            // matched file so add
+            fileList.add(file);
+        }
+    }
+
+    /**
      * Processes the given file
      *
-     * @param exchange  the file exchange
+     * @param exchange the file exchange
      */
     protected void processExchange(final FileExchange exchange) {
         final File target = exchange.getFile();
@@ -177,12 +200,12 @@
     /**
      * Strategy when the file was processed and a commit should be executed.
      *
-     * @param processStrategy   the strategy to perform the commit
-     * @param exchange          the exchange
-     * @param file              the file processed
-     * @param failureHandled    is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
-     * an exception occured during processing but it was handled by the failure processor (usually the
-     * DeadLetterChannel).
+     * @param processStrategy the strategy to perform the commit
+     * @param exchange        the exchange
+     * @param file            the file processed
+     * @param failureHandled  is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
+     *                        an exception occured during processing but it was handled by the failure processor (usually the
+     *                        DeadLetterChannel).
      */
     protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
                                          File file, boolean failureHandled) {
@@ -191,7 +214,7 @@
             // use file.getPath as key for the idempotent repository to support files with same name but in different folders
             endpoint.getIdempotentRepository().add(file.getPath());
         }
-        
+
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Committing file strategy: " + processStrategy + " for file: "
@@ -207,9 +230,9 @@
     /**
      * Strategy when the file was not processed and a rollback should be executed.
      *
-     * @param processStrategy   the strategy to perform the commit
-     * @param exchange          the exchange
-     * @param file              the file processed
+     * @param processStrategy the strategy to perform the commit
+     * @param exchange        the exchange
+     * @param file            the file processed
      */
     protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
         if (LOG.isDebugEnabled()) {
@@ -220,11 +243,12 @@
 
     /**
      * Strategy for validating if the given file should be included or not
-     * @param file  the file
+     *
+     * @param file the file
      * @return true to include the file, false to skip it
      */
-    protected boolean validateFile(File file) {
-        if (!matchFile(file)) {
+    protected boolean isValidFile(File file) {
+        if (!isMatched(file)) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("File did not match. Will skip this file: " + file);
             }
@@ -244,15 +268,15 @@
      * <p/>
      * Will always return <tt>false</tt> for certain files/folders:
      * <ul>
-     *    <li>Starting with a dot</li>
-     *    <li>lock files</li>
+     *   <li>Starting with a dot</li>
+     *   <li>lock files</li>
      * </ul>
      * And then <tt>true</tt> for directories.
      *
-     * @param file  the file
-     * @return true if the file is matche, false if not
+     * @param file the file
+     * @return true if the file is matched, false if not
      */
-    protected boolean matchFile(File file) {
+    protected boolean isMatched(File file) {
         String name = file.getName();
 
         // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
@@ -295,12 +319,6 @@
         return true;
     }
 
-    private void addFile(File file, List<File> fileList) {
-        if (validateFile(file)) {
-            fileList.add(file);
-        }
-    }
-
     public boolean isRecursive() {
         return this.recursive;
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java Sun Dec 28 01:31:50 2008
@@ -183,7 +183,7 @@
     }
 
     public boolean isAutoCreate() {
-        return this.autoCreate;
+        return autoCreate;
     }
 
     public void setAutoCreate(boolean autoCreate) {
@@ -193,7 +193,9 @@
     public FileProcessStrategy getFileStrategy() {
         if (fileProcessStrategy == null) {
             fileProcessStrategy = createFileStrategy();
-            LOG.debug("Using file process strategy: " + fileProcessStrategy);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Using file process strategy: " + fileProcessStrategy);
+            }
         }
         return fileProcessStrategy;
     }

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java (from r729464, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java&r1=729464&r2=729712&rev=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeSingleDirectoryOnlyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java Sun Dec 28 01:31:50 2008
@@ -16,37 +16,61 @@
  */
 package org.apache.camel.component.file;
 
+import java.io.File;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for consuming the single directory only.
+ * Unit test for consuming multiple directories.
  */
-public class FileConsumeSingleDirectoryOnlyTest extends ContextTestSupport {
+public class FileConsumeMultipleDirectoriesTest extends ContextTestSupport {
+
+    private String fileUrl = "file://target/multidir/?consumer.recursive=true&delete=true&consumer.delay=5000&sortBy=file:path";
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        deleteDirectory("target/singledirectoryonly");
-        template.sendBodyAndHeader("file://target/singledirectoryonly", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
-        template.sendBodyAndHeader("file://target/singledirectoryonly", "Bye World", FileComponent.HEADER_FILE_NAME, "report2.txt");
-        template.sendBodyAndHeader("file://target/singledirectoryonly/2008", "2008 Report", FileComponent.HEADER_FILE_NAME, "report2008.txt");
+        deleteDirectory("target/multidir");
+        template.sendBodyAndHeader(fileUrl, "Bye World", FileComponent.HEADER_FILE_NAME, "bye.txt");
+        template.sendBodyAndHeader(fileUrl, "Hello World", FileComponent.HEADER_FILE_NAME, "sub/hello.txt");
+        template.sendBodyAndHeader(fileUrl, "Godday World", FileComponent.HEADER_FILE_NAME, "sub/sub2/godday.txt");
     }
 
-    public void testConsumeFileOnly() throws Exception {
+    public void testMultiDir() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        mock.expectedBodiesReceived("Bye World", "Hello World", "Godday World");
 
         assertMockEndpointsSatisfied();
+
+        FileExchange exchange = (FileExchange) mock.getExchanges().get(0);
+        File file = exchange.getFile();
+        assertFilePath("target/multidir/bye.txt", file.getPath());
+        assertEquals("bye.txt", file.getName());
+
+        exchange = (FileExchange) mock.getExchanges().get(1);
+        file = exchange.getFile();
+        assertFilePath("target/multidir/sub/hello.txt", file.getPath());
+        assertEquals("hello.txt", file.getName());
+
+        exchange = (FileExchange) mock.getExchanges().get(2);
+        file = exchange.getFile();
+        assertFilePath("target/multidir/sub/sub2/godday.txt", file.getPath());
+        assertEquals("godday.txt", file.getName());
+    }
+
+    private static void assertFilePath(String expected, String actual) {
+        actual = actual.replaceAll("\\\\", "/");
+        assertEquals(expected, actual);
     }
 
-    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("file://target/singledirectoryonly/?consumer.recursive=false&delete=true").to("mock:result");
+                from(fileUrl).to("mock:result");
             }
         };
     }
-}
+
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeMultipleDirectoriesTest.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java Sun Dec 28 01:31:50 2008
@@ -36,7 +36,9 @@
     public void testManually() throws Exception {
         deleteDirectory("./target/exclusiveread");
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.setSleepForEmptyTest(10000);
+        // this is used for manual testing where you can copy/lock files etc. while this test runs
+        //mock.setSleepForEmptyTest(10 * 1000L);
+        mock.setSleepForEmptyTest(100);
         mock.expectedMessageCount(0);
 
         mock.assertIsSatisfied();

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java Sun Dec 28 01:31:50 2008
@@ -18,7 +18,6 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.nio.channels.FileLock;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -51,7 +50,6 @@
         deleteDirectory("./target/exclusiveread");
         createDirectory("./target/exclusiveread/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
         mock.expectedMessageCount(1);
 
         // send a message to seda:start to trigger the creating of the slowfile to poll
@@ -67,7 +65,7 @@
     private class MySlowFileProcessor implements Processor {
 
         public void process(Exchange exchange) throws Exception {
-            LOG.info("Creating a slow fil with no locks...");
+            LOG.info("Creating a slow file with no locks...");
             File file = new File("./target/exclusiveread/slowfile/hello.txt");
             FileOutputStream fos = new FileOutputStream(file);
             fos.write("Hello World".getBytes());

Modified: activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ activemq/camel/trunk/camel-core/src/test/resources/log4j.properties Sun Dec 28 01:31:50 2008
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
 
 log4j.logger.org.apache.activemq.spring=WARN
 log4j.logger.org.apache.camel=DEBUG
@@ -26,13 +26,13 @@
 log4j.logger.org.apache.camel.impl.converter=WARN
 
 # CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
 log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/camel-core-test.log
-log4j.appender.out.append=true
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-core-test.log
+log4j.appender.file.append=true

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Sun Dec 28 01:31:50 2008
@@ -45,15 +45,19 @@
         List<FTPFile> files = operations.listFiles(fileName);
         for (FTPFile file : files) {
             RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
-            if (processDir && file.isDirectory() && isValidFile(remote, true)) {
-                // recursive scan and add the sub files and folders
-                String directory = fileName + "/" + file.getName();
-                pollDirectory(directory, endpoint.isRecursive(), fileList);
-            } else if (file.isFile() && isValidFile(remote, false)) {
-                // matched file so add
-                fileList.add(remote);
+            if (processDir && file.isDirectory()) {
+                if (isValidFile(remote, true)) {
+                    // recursive scan and add the sub files and folders
+                    String directory = fileName + "/" + file.getName();
+                    pollDirectory(directory, endpoint.isRecursive(), fileList);
+                }
+            } else if (file.isFile()) {
+                if (isValidFile(remote, false)) {
+                    // matched file so add
+                    fileList.add(remote);
+                }
             } else {
-                log.debug("Ignoring unsupported file type " + file);
+                log.debug("Ignoring unsupported remote file type: " + file);
             }
         }
     }
@@ -69,7 +73,10 @@
         FTPFile file = list.get(0);
         if (file != null) {
             RemoteFile remoteFile = asRemoteFile(directory, file);
-            fileList.add(remoteFile);
+            if (isValidFile(remoteFile, false)) {
+                // matched file so add
+                fileList.add(remoteFile);
+            }
         }
     }
 

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Sun Dec 28 01:31:50 2008
@@ -48,8 +48,59 @@
         this.operations = operations;
     }
 
+    protected void poll() throws Exception {
+        connectIfNecessary();
+        if (!loggedIn) {
+            String message = "Could not connect/login to: " + endpoint.remoteServerInformation() + ". Will skip this poll.";
+            log.warn(message);
+            return;
+        }
+
+        // gather list of files to process
+        List<RemoteFile> files = new ArrayList<RemoteFile>();
+
+        String name = endpoint.getConfiguration().getFile();
+        boolean isDirectory = endpoint.getConfiguration().isDirectory();
+        if (isDirectory) {
+            pollDirectory(name, endpoint.isRecursive(), files);
+        } else {
+            pollFile(name, files);
+        }
+
+        // sort files using file comparator if provided
+        if (endpoint.getSorter() != null) {
+            Collections.sort(files, endpoint.getSorter());
+        }
+
+        // sort using build in sorters that is expression based
+        // first we need to convert to RemoteFileExchange objects so we can sort using expressions
+        List<RemoteFileExchange> exchanges = new ArrayList<RemoteFileExchange>(files.size());
+        for (RemoteFile file : files) {
+            RemoteFileExchange exchange = endpoint.createExchange(file);
+            endpoint.configureMessage(file, exchange.getIn());
+            exchanges.add(exchange);
+        }
+        // sort files using exchange comparator if provided
+        if (endpoint.getSortBy() != null) {
+            Collections.sort(exchanges, endpoint.getSortBy());
+        }
+
+        // consume files one by one
+        int total = exchanges.size();
+        if (total > 0 && log.isDebugEnabled()) {
+            log.debug("Total " + total + " files to consume");
+        }
+        for (int index = 0; index < total; index++) {
+            RemoteFileExchange exchange = exchanges.get(index);
+            // add current index and total as headers
+            exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
+            exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
+            processExchange(exchange);
+        }
+    }
+
     /**
-     * Polls the given directory for files to process.
+     * Polls the given directory for files to process
      *
      * @param fileName    current directory or file
      * @param processDir  recursive
@@ -183,12 +234,12 @@
      *
      * @param file         the remote file
      * @param isDirectory  wether the file is a directory or a file
-     * @return <tttrue</tt> to include the file, <tt>false</tt> to skip it
+     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
      */
     protected boolean isValidFile(RemoteFile file, boolean isDirectory) {
         if (!isMatched(file, isDirectory)) {
             if (log.isTraceEnabled()) {
-                log.trace("File did not match. Will skip this file: " + file);
+                log.trace("Remote file did not match. Will skip this remote file: " + file);
             }
             return false;
         }
@@ -198,11 +249,18 @@
     }
 
     /**
-     * Is the given file matched to be consumed.
+     * Strategy to perform file matching based on endpoint configuration.
+     * <p/>
+     * Will always return <tt>false</tt> for certain files/folders:
+     * <ul>
+     *   <li>Starting with a dot</li>
+     *   <li>lock files</li>
+     * </ul>
+     * And then <tt>true</tt> for directories.
      *
      * @param file         the remote file
-     * @param isDirectory  is the given file a directory or a file
-     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
+     * @param isDirectory  wether the file is a directory or a file
+     * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
      */
     protected boolean isMatched(RemoteFile file, boolean isDirectory) {
         String name = file.getFileName();
@@ -285,55 +343,4 @@
     protected String remoteServer() {
         return endpoint.remoteServerInformation();
     }
-
-    protected void poll() throws Exception {
-        connectIfNecessary();
-        if (!loggedIn) {
-            String message = "Could not connect/login to: " + endpoint.remoteServerInformation() + ". Will skip this poll.";
-            log.warn(message);
-            return;
-        }
-
-        // gather list of files to process
-        List<RemoteFile> files = new ArrayList<RemoteFile>();
-
-        String name = endpoint.getConfiguration().getFile();
-        boolean isDirectory = endpoint.getConfiguration().isDirectory();
-        if (isDirectory) {
-            pollDirectory(name, true, files);
-        } else {
-            pollFile(name, files);
-        }
-
-        // sort files using file comparator if provided
-        if (endpoint.getSorter() != null) {
-            Collections.sort(files, endpoint.getSorter());
-        }
-
-        // sort using build in sorters that is expression based
-        // first we need to convert to RemoteFileExchange objects so we can sort using expressions
-        List<RemoteFileExchange> exchanges = new ArrayList<RemoteFileExchange>(files.size());
-        for (RemoteFile file : files) {
-            RemoteFileExchange exchange = endpoint.createExchange(file);
-            endpoint.configureMessage(file, exchange.getIn());
-            exchanges.add(exchange);
-        }
-          // sort files using exchange comparator if provided
-        if (endpoint.getSortBy() != null) {
-            Collections.sort(exchanges, endpoint.getSortBy());
-        }
-
-        // consume files one by one
-        int total = exchanges.size();
-        if (total > 0 && log.isDebugEnabled()) {
-            log.debug("Total " + total + " files to consume");
-        }
-        for (int index = 0; index < total; index++) {
-            RemoteFileExchange exchange = exchanges.get(index);
-            // add current index and total as headers
-            exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_INDEX, index);
-            exchange.getIn().setHeader(FileComponent.HEADER_FILE_BATCH_TOTAL, total);
-            processExchange(exchange);
-        }
-    }
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=729712&r1=729711&r2=729712&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Sun Dec 28 01:31:50 2008
@@ -35,27 +35,30 @@
             return;
         }
 
-        String currentDir = operations.getCurrentDirectory();
-        operations.changeCurrentDirectory(fileName);
+        if (fileName.endsWith("/")) {
+            fileName = fileName.substring(0, fileName.length() - 1);
+        }
 
         if (log.isTraceEnabled()) {
             log.trace("Polling directory: " + fileName);
         }
-        List<ChannelSftp.LsEntry> files = operations.listFiles();
+        List<ChannelSftp.LsEntry> files = operations.listFiles(fileName);
         for (ChannelSftp.LsEntry file : files) {
-            RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(file);
-            if (processDir && file.getAttrs().isDir() && isValidFile(remote, true)) {
-                // recursive scan and add the sub files and folders
-                pollDirectory(file.getFilename(), endpoint.isRecursive(), fileList);
-            } else if (!file.getAttrs().isLink() && isValidFile(remote, false)) {
-                // matched file so add
-                fileList.add(remote);
+            RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file);
+            if (processDir && file.getAttrs().isDir()) {
+                if (isValidFile(remote, true)) {
+                    // recursive scan and add the sub files and folders
+                    pollDirectory(file.getFilename(), endpoint.isRecursive(), fileList);
+                }
+            } else if (!file.getAttrs().isLink()) {
+                if (isValidFile(remote, false)) {
+                    // matched file so add
+                    fileList.add(remote);
+                }
             } else {
-                log.debug("Ignoring unsupported file type " + file);
+                log.debug("Ignoring unsupported remote file type: " + file);
             }
         }
-
-        operations.changeCurrentDirectory(currentDir);
     }
 
     /**
@@ -65,33 +68,36 @@
      * @param fileList  current list of files gathered
      */
     protected void pollFile(String fileName, List<RemoteFile> fileList) {
+        String directory = ".";
         int index = fileName.lastIndexOf("/");
         if (index > -1) {
-            // cd to the folder of the filename
-            operations.changeCurrentDirectory(fileName.substring(0, index));
+            directory = fileName.substring(0, index);
         }
         // list the files in the fold and poll the first file
-        List<ChannelSftp.LsEntry> list = operations.listFiles(fileName.substring(index + 1));
+        List<ChannelSftp.LsEntry> list = operations.listFiles(fileName);
         ChannelSftp.LsEntry file = list.get(0);
         if (file != null) {
-            RemoteFile remoteFile = asRemoteFile(file);
-            fileList.add(remoteFile);
+            RemoteFile remoteFile = asRemoteFile(directory, file);
+            if (isValidFile(remoteFile, false)) {
+                // matched file so add
+                fileList.add(remoteFile);
+            }
         }
     }
 
-    private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(ChannelSftp.LsEntry file) {
+    private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String directory, ChannelSftp.LsEntry file) {
         RemoteFile<ChannelSftp.LsEntry> remote = new RemoteFile<ChannelSftp.LsEntry>();
         remote.setFile(file);
         remote.setFileName(file.getFilename());
         remote.setFileLength(file.getAttrs().getSize());
         remote.setLastModified(file.getAttrs().getMTime() * 1000L);
         remote.setHostname(endpoint.getConfiguration().getHost());
-        String absoluteFileName = getAbsoluteFileName(file);
+        String absoluteFileName = directory + "/" + file.getFilename();
         remote.setAbsolutelFileName(absoluteFileName);
 
         // the relative filename
         String ftpBasePath = endpoint.getConfiguration().getFile();
-        String relativePath = absoluteFileName.substring(ftpBasePath.length() + 1);
+        String relativePath = absoluteFileName.substring(ftpBasePath.length());
         if (relativePath.startsWith("/")) {
             relativePath = relativePath.substring(1);
         }
@@ -100,8 +106,4 @@
         return remote;
     }
 
-    private String getAbsoluteFileName(ChannelSftp.LsEntry ftpFile) {
-        return operations.getCurrentDirectory() + "/" + ftpFile.getFilename();
-    }
-
 }