You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/06 10:20:13 UTC

svn commit: r960839 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/ components/camel-spring-security/

Author: davsclaus
Date: Tue Jul  6 08:20:12 2010
New Revision: 960839

URL: http://svn.apache.org/viewvc?rev=960839&view=rev
Log:
CAMEL-2899: maxMessagesPerPoll is applied eagerly for the file/ftp components to avoid excessive memory consumption when polling directories with 100.000 or more files.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    camel/trunk/components/camel-spring-security/pom.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=960839&r1=960838&r2=960839&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Tue Jul  6 08:20:12 2010
@@ -35,7 +35,7 @@ public class FileConsumer extends Generi
         this.endpointPath = endpoint.getConfiguration().getDirectory();
     }
 
-    protected void pollDirectory(String fileName, List<GenericFile<File>> fileList) {
+    protected boolean pollDirectory(String fileName, List<GenericFile<File>> fileList) {
         if (log.isTraceEnabled()) {
             log.trace("pollDirectory from fileName: " + fileName);
         }
@@ -45,7 +45,7 @@ public class FileConsumer extends Generi
             if (log.isDebugEnabled()) {
                 log.debug("Cannot poll as directory does not exists or its not a directory: " + directory);
             }
-            return;
+            return true;
         }
 
         if (log.isTraceEnabled()) {
@@ -57,7 +57,7 @@ public class FileConsumer extends Generi
             if (log.isTraceEnabled()) {
                 log.trace("No files found in directory: " + directory.getPath());
             }
-            return;
+            return true;
         } else {
             // we found some files
             if (log.isTraceEnabled()) {
@@ -66,6 +66,11 @@ public class FileConsumer extends Generi
         }
 
         for (File file : files) {
+            // check if we can continue polling in files
+            if (!canPollMoreFiles(fileList)) {
+                return false;
+            }
+
             // trace log as Windows/Unix can have different views what the file is?
             if (log.isTraceEnabled()) {
                 log.trace("Found file: " + file + " [isAbsolute: " + file.isAbsolute() + ", isDirectory: "
@@ -79,7 +84,10 @@ public class FileConsumer extends Generi
                 if (endpoint.isRecursive() && isValidFile(gf, true)) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = fileName + File.separator + file.getName();
-                    pollDirectory(subDirectory, fileList);
+                    boolean canPollMore = pollDirectory(subDirectory, fileList);
+                    if (!canPollMore) {
+                        return false;
+                    }
                 }
             } else {
                 // Windows can report false to a file on a share so regard it always as a file (if its not a directory)
@@ -98,6 +106,8 @@ public class FileConsumer extends Generi
                 }
             }
         }
+
+        return true;
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=960839&r1=960838&r2=960839&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jul  6 08:20:12 2010
@@ -73,9 +73,15 @@ public abstract class GenericFileConsume
 
         // gather list of files to process
         List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
-
         String name = endpoint.getConfiguration().getDirectory();
-        pollDirectory(name, files);
+        boolean limitHit = !pollDirectory(name, files);
+
+        // log if we hit the limit
+        if (limitHit) {
+            if (log.isDebugEnabled()) {
+                log.debug("Limiting maximum messages to poll at " + maxMessagesPerPoll + " files as there was more messages in this poll.");
+            }
+        }
 
         // sort files using file comparator if provided
         if (endpoint.getSorter() != null) {
@@ -180,6 +186,20 @@ public abstract class GenericFileConsume
     }
 
     /**
+     * Whether or not we can continue polling for more files
+     *
+     * @param fileList  the current list of gathered files
+     * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
+     */
+    public boolean canPollMoreFiles(List fileList) {
+        if (maxMessagesPerPoll > 0 && fileList.size() >= maxMessagesPerPoll) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    /**
      * Override if required. Perform some checks (and perhaps actions) before we
      * poll.
      *
@@ -202,8 +222,9 @@ public abstract class GenericFileConsume
      *
      * @param fileName current directory or file
      * @param fileList current list of files gathered
+     * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
      */
-    protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList);
+    protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList);
 
     /**
      * Sets the operations to be used.

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=960839&r1=960838&r2=960839&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Tue Jul  6 08:20:12 2010
@@ -36,9 +36,12 @@ public class FtpConsumer extends RemoteF
         this.endpointPath = endpoint.getConfiguration().getDirectory();
     }
 
-    protected void pollDirectory(String fileName, List<GenericFile<FTPFile>> fileList) {
+    protected boolean pollDirectory(String fileName, List<GenericFile<FTPFile>> fileList) {
+        if (log.isTraceEnabled()) {
+            log.trace("pollDirectory from fileName: " + fileName);
+        }
         if (fileName == null) {
-            return;
+            return true;
         }
 
         // remove trailing /
@@ -48,13 +51,35 @@ public class FtpConsumer extends RemoteF
             log.trace("Polling directory: " + fileName);
         }
         List<FTPFile> files = operations.listFiles(fileName);
+        if (files == null || files.isEmpty()) {
+            // no files in this directory to poll
+            if (log.isTraceEnabled()) {
+                log.trace("No files found in directory: " + fileName);
+            }
+            return true;
+        } else {
+            // we found some files
+            if (log.isTraceEnabled()) {
+                log.trace("Found " + files.size() + " in directory: " + fileName);
+            }
+        }
+
         for (FTPFile file : files) {
+
+            // check if we can continue polling in files
+            if (!canPollMoreFiles(fileList)) {
+                return false;
+            }
+
             if (file.isDirectory()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
                 if (endpoint.isRecursive() && isValidFile(remote, true)) {
                     // recursive scan and add the sub files and folders
-                    String directory = fileName + "/" + file.getName();
-                    pollDirectory(directory, fileList);
+                    String subDirectory = fileName + "/" + file.getName();
+                    boolean canPollMore = pollDirectory(subDirectory, fileList);
+                    if (!canPollMore) {
+                        return false;
+                    }
                 }
             } else if (file.isFile()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(fileName, file);
@@ -72,6 +97,8 @@ public class FtpConsumer extends RemoteF
                 log.debug("Ignoring unsupported remote file type: " + file);
             }
         }
+
+        return true;
     }
 
     private RemoteFile<FTPFile> asRemoteFile(String directory, FTPFile file) {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=960839&r1=960838&r2=960839&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Tue Jul  6 08:20:12 2010
@@ -36,9 +36,13 @@ public class SftpConsumer extends Remote
         this.endpointPath = endpoint.getConfiguration().getDirectory();
     }
 
-    protected void pollDirectory(String fileName, List<GenericFile<ChannelSftp.LsEntry>> fileList) {
+    protected boolean pollDirectory(String fileName, List<GenericFile<ChannelSftp.LsEntry>> fileList) {
+        if (log.isTraceEnabled()) {
+            log.trace("pollDirectory from fileName: " + fileName);
+        }
+
         if (fileName == null) {
-            return;
+            return true;
         }
 
         // remove trailing /
@@ -48,13 +52,35 @@ public class SftpConsumer extends Remote
             log.trace("Polling directory: " + fileName);
         }
         List<ChannelSftp.LsEntry> files = operations.listFiles(fileName);
+        if (files == null || files.isEmpty()) {
+            // no files in this directory to poll
+            if (log.isTraceEnabled()) {
+                log.trace("No files found in directory: " + fileName);
+            }
+            return true;
+        } else {
+            // we found some files
+            if (log.isTraceEnabled()) {
+                log.trace("Found " + files.size() + " in directory: " + fileName);
+            }
+        }
+
         for (ChannelSftp.LsEntry file : files) {
+
+            // check if we can continue polling in files
+            if (!canPollMoreFiles(fileList)) {
+                return false;
+            }
+
             if (file.getAttrs().isDir()) {
                 RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(fileName, file);
                 if (endpoint.isRecursive() && isValidFile(remote, true)) {
                     // recursive scan and add the sub files and folders
-                    String directory = fileName + "/" + file.getFilename();
-                    pollDirectory(directory, fileList);
+                    String subDirectory = fileName + "/" + file.getFilename();
+                    boolean canPollMore = pollDirectory(subDirectory, fileList);
+                    if (!canPollMore) {
+                        return false;
+                    }
                 }
                 // we cannot use file.getAttrs().isLink on Windows, so we dont invoke the method
                 // just assuming its a file we should poll
@@ -72,6 +98,8 @@ public class SftpConsumer extends Remote
                 }
             }
         }
+
+        return true;
     }
 
     private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String directory, ChannelSftp.LsEntry file) {

Modified: camel/trunk/components/camel-spring-security/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-security/pom.xml?rev=960839&r1=960838&r2=960839&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-security/pom.xml (original)
+++ camel/trunk/components/camel-spring-security/pom.xml Tue Jul  6 08:20:12 2010
@@ -9,7 +9,7 @@
 
 	<artifactId>camel-spring-security</artifactId>
 	<packaging>bundle</packaging>
-	<name>Camel :: Spring :: Security </name>
+	<name>Camel :: Spring Security </name>
 	<description>Camel Spring Security support</description>
 
 	<properties>