You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/17 09:38:29 UTC

svn commit: r1422793 - in /camel/branches/camel-2.10.x: ./ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/

Author: davsclaus
Date: Mon Dec 17 08:38:27 2012
New Revision: 1422793

URL: http://svn.apache.org/viewvc?rev=1422793&view=rev
Log:
CAMEL-5848: file/ftp consumers - When using doneFileName then avoid picking up files in middle of group if done file is written during scanning

Added:
    camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java
      - copied unchanged from r1422792, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
    camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1422792

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Dec 17 08:38:27 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.camel.Processor;
@@ -51,8 +52,8 @@ public class FileConsumer extends Generi
         }
 
         log.trace("Polling directory: {}", directory.getPath());
-        File[] files = directory.listFiles();
-        if (files == null || files.length == 0) {
+        File[] dirFiles = directory.listFiles();
+        if (dirFiles == null || dirFiles.length == 0) {
             // no files in this directory to poll
             if (log.isTraceEnabled()) {
                 log.trace("No files found in directory: {}", directory.getPath());
@@ -61,9 +62,10 @@ public class FileConsumer extends Generi
         } else {
             // we found some files
             if (log.isTraceEnabled()) {
-                log.trace("Found {} in directory: {}", files.length, directory.getPath());
+                log.trace("Found {} in directory: {}", dirFiles.length, directory.getPath());
             }
         }
+        List<File> files = Arrays.asList(dirFiles);
 
         for (File file : files) {
             // check if we can continue polling in files
@@ -81,7 +83,7 @@ public class FileConsumer extends Generi
             GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset());
 
             if (file.isDirectory()) {
-                if (endpoint.isRecursive() && isValidFile(gf, true) && depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && isValidFile(gf, true, files) && depth < endpoint.getMaxDepth()) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = fileName + File.separator + file.getName();
                     boolean canPollMore = pollDirectory(subDirectory, fileList, depth);
@@ -91,7 +93,7 @@ public class FileConsumer extends Generi
                 }
             } else {
                 // Windows can report false to a file on a share so regard it always as a file (if its not a directory)
-                if (isValidFile(gf, false) && depth >= endpoint.minDepth) {
+                if (isValidFile(gf, false, files) && depth >= endpoint.minDepth) {
                     if (isInProgress(gf)) {
                         if (log.isTraceEnabled()) {
                             log.trace("Skipping as file is already in progress: {}", gf.getFileName());
@@ -109,6 +111,19 @@ public class FileConsumer extends Generi
         return true;
     }
 
+    @Override
+    protected boolean isMatched(GenericFile<File> file, String doneFileName, List<File> files) {
+        String onlyName = FileUtil.stripPath(doneFileName);
+        // the done file name must be among the files
+        for (File f : files) {
+            if (f.getName().equals(onlyName)) {
+                return true;
+            }
+        }
+        log.trace("Done file: {} does not exist", doneFileName);
+        return false;
+    }
+
     /**
      * Creates a new GenericFile<File> based on the given file.
      *

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Dec 17 08:38:27 2012
@@ -389,10 +389,11 @@ public abstract class GenericFileConsume
      *
      * @param file        the file
      * @param isDirectory whether the file is a directory or a file
+     * @param files       files in the directory
      * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
      */
-    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
-        if (!isMatched(file, isDirectory)) {
+    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
+        if (!isMatched(file, isDirectory, files)) {
             log.trace("File did not match. Will skip this file: {}", file);
             return false;
         } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
@@ -416,9 +417,10 @@ public abstract class GenericFileConsume
      *
      * @param file        the file
      * @param isDirectory whether the file is a directory or a file
+     * @param files       files in the directory
      * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
      */
-    protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
+    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
         String name = file.getFileNameOnly();
 
         // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
@@ -482,7 +484,7 @@ public abstract class GenericFileConsume
                 return false;
             }
 
-            if (!isMatched(file, doneFileName)) {
+            if (!isMatched(file, doneFileName, files)) {
                 return false;
             }
         }
@@ -494,19 +496,11 @@ public abstract class GenericFileConsume
      * Strategy to perform file matching based on endpoint configuration in terms of done file name.
      *
      * @param file         the file
-     * @param doneFileName the done file name
+     * @param doneFileName the done file name (without any paths)
+     * @param files        files in the directory
      * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
      */
-    protected boolean isMatched(GenericFile<T> file, String doneFileName) {
-        // the file is only valid if the done file exist
-        if (!operations.existsFile(doneFileName)) {
-            log.trace("Done file: {} does not exist", doneFileName);
-            return false;
-        }
-
-        // assume matched
-        return true;
-    }
+    protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
 
     /**
      * Is the given file already in progress.

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Mon Dec 17 08:38:27 2012
@@ -794,7 +794,6 @@ public abstract class GenericFileEndpoin
         if (ObjectHelper.isNotEmpty(path) && ObjectHelper.isNotEmpty(pattern)) {
             // done file must always be in same directory as the real file name
             answer = path + getFileSeparator() + pattern;
-            answer = path + File.separator + pattern;
         }
 
         if (getConfiguration().needToNormalize()) {

Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Dec 17 08:38:27 2012
@@ -108,7 +108,7 @@ public class FtpConsumer extends RemoteF
 
             if (file.isDirectory()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
-                if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = file.getName();
                     String path = absolutePath + "/" + subDirectory;
@@ -119,7 +119,7 @@ public class FtpConsumer extends RemoteF
                 }
             } else if (file.isFile()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
-                if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) {
+                if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) {
                     if (isInProgress(remote)) {
                         log.trace("Skipping as file is already in progress: {}", remote.getFileName());
                     } else {
@@ -135,6 +135,20 @@ public class FtpConsumer extends RemoteF
         return true;
     }
 
+    @Override
+    protected boolean isMatched(GenericFile<FTPFile> file, String doneFileName, List<FTPFile> files) {
+        String onlyName = FileUtil.stripPath(doneFileName);
+
+        for (FTPFile f : files) {
+            if (f.getName().equals(onlyName)) {
+                return true;
+            }
+        }
+
+        log.trace("Done file: {} does not exist", doneFileName);
+        return false;
+    }
+
     private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
         RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
 

Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Mon Dec 17 08:38:27 2012
@@ -20,10 +20,8 @@ import java.io.IOException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileConsumer;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
-import org.apache.camel.util.FileUtil;
 
 /**
  * Base class for remote file consumers.
@@ -155,14 +153,4 @@ public abstract class RemoteFileConsumer
         return ((RemoteFileEndpoint<?>) endpoint).remoteServerInformation();
     }
 
-    @Override
-    protected boolean isMatched(GenericFile<T> file, String doneFileName) {
-        // ftp specific as we need to cater for stepwise
-        if (getEndpoint().getConfiguration().isStepwise()) {
-            // stepwise enabled, so done file should always be without path
-            doneFileName = FileUtil.stripPath(doneFileName);
-        }
-
-        return super.isMatched(file, doneFileName);
-    }
 }

Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Dec 17 08:38:27 2012
@@ -105,7 +105,7 @@ public class SftpConsumer extends Remote
 
             if (file.getAttrs().isDir()) {
                 RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file);
-                if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = file.getFilename();
                     String path = absolutePath + "/" + subDirectory;
@@ -118,7 +118,7 @@ public class SftpConsumer extends Remote
                 // just assuming its a file we should poll
             } else {
                 RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file);
-                if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) {
+                if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) {
                     if (isInProgress(remote)) {
                         if (log.isTraceEnabled()) {
                             log.trace("Skipping as file is already in progress: {}", remote.getFileName());
@@ -134,6 +134,20 @@ public class SftpConsumer extends Remote
         return true;
     }
 
+    @Override
+    protected boolean isMatched(GenericFile<ChannelSftp.LsEntry> file, String doneFileName, List<ChannelSftp.LsEntry> files) {
+        String onlyName = FileUtil.stripPath(doneFileName);
+
+        for (ChannelSftp.LsEntry f : files) {
+            if (f.getFilename().equals(onlyName)) {
+                return true;
+            }
+        }
+
+        log.trace("Done file: {} does not exist", doneFileName);
+        return false;
+    }
+
     private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String absolutePath, ChannelSftp.LsEntry file) {
         RemoteFile<ChannelSftp.LsEntry> answer = new RemoteFile<ChannelSftp.LsEntry>();