You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/08/16 10:27:47 UTC

svn commit: r1373751 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/file/strategy/ camel-core/src/test/java/org/apache/camel/component/file/strategy/ components/camel-ftp...

Author: davsclaus
Date: Thu Aug 16 08:27:47 2012
New Revision: 1373751

URL: http://svn.apache.org/viewvc?rev=1373751&view=rev
Log:
CAMEL-5513: Added readLockMinLength option to allow accepting zero length files for changed read lock etc.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedZeroLengthReadLockTest.java
      - copied, changed from r1373737, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedZeroLengthReadLockTest.java
      - copied, changed from r1373737, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Thu Aug 16 08:27:47 2012
@@ -93,6 +93,7 @@ public abstract class GenericFileEndpoin
     protected String readLock = "none";
     protected long readLockCheckInterval = 1000;
     protected long readLockTimeout = 10000;
+    protected long readLockMinLength = 1;
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
     protected boolean keepLastModified;
     protected String doneFileName;
@@ -510,6 +511,14 @@ public abstract class GenericFileEndpoin
         this.readLockTimeout = readLockTimeout;
     }
 
+    public long getReadLockMinLength() {
+        return readLockMinLength;
+    }
+
+    public void setReadLockMinLength(long readLockMinLength) {
+        this.readLockMinLength = readLockMinLength;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -724,6 +733,7 @@ public abstract class GenericFileEndpoin
         if (readLockTimeout > 0) {
             params.put("readLockTimeout", readLockTimeout);
         }
+        params.put("readLockMinLength", readLockMinLength);
 
         return params;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java Thu Aug 16 08:27:47 2012
@@ -33,6 +33,7 @@ public class FileChangedExclusiveReadLoc
     private static final transient Logger LOG = LoggerFactory.getLogger(FileChangedExclusiveReadLockStrategy.class);
     private long timeout;
     private long checkInterval = 1000;
+    private long minLength = 1;
 
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
         // must call super
@@ -66,8 +67,7 @@ public class FileChangedExclusiveReadLoc
             LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified);
             LOG.trace("Previous length: {}, new length: {}", length, newLength);
 
-            if (newLastModified == lastModified && newLength == length && length != 0) {
-                // We consider that zero-length files are files in progress
+            if (length >= minLength && (newLastModified == lastModified && newLength == length)) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -113,4 +113,11 @@ public class FileChangedExclusiveReadLoc
         this.checkInterval = checkInterval;
     }
 
+    public long getMinLength() {
+        return minLength;
+    }
+
+    public void setMinLength(long minLength) {
+        this.minLength = minLength;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Thu Aug 16 08:27:47 2012
@@ -132,7 +132,7 @@ public final class FileProcessStrategyFa
                 }
                 return readLockStrategy;
             } else if ("changed".equals(readLock)) {
-                GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new FileChangedExclusiveReadLockStrategy();
+                FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy();
                 Long timeout = (Long) params.get("readLockTimeout");
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
@@ -141,6 +141,10 @@ public final class FileProcessStrategyFa
                 if (checkInterval != null) {
                     readLockStrategy.setCheckInterval(checkInterval);
                 }
+                Long minLength = (Long) params.get("readLockMinLength");
+                if (minLength != null) {
+                    readLockStrategy.setMinLength(minLength);
+                }
                 return readLockStrategy;
             } else if ("markerFile".equals(readLock)) {
                 return new MarkerFileExclusiveReadLockStrategy();

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedZeroLengthReadLockTest.java (from r1373737, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedZeroLengthReadLockTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedZeroLengthReadLockTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java&r1=1373737&r2=1373751&rev=1373751&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedReadLockTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/strategy/FileChangedZeroLengthReadLockTest.java Thu Aug 16 08:27:47 2012
@@ -16,21 +16,16 @@
  */
 package org.apache.camel.component.file.strategy;
 
-import java.io.File;
 import java.io.FileOutputStream;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @version
  */
-public class FileChangedReadLockTest extends ContextTestSupport {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(FileChangedReadLockTest.class);
+public class FileChangedZeroLengthReadLockTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
@@ -42,33 +37,17 @@ public class FileChangedReadLockTest ext
     public void testChangedReadLock() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedFileExists("target/changed/out/slowfile.dat");
+        mock.expectedFileExists("target/changed/out/zerofile.dat");
 
-        writeSlowFile();
+        writeZeroLengthFile();
 
         assertMockEndpointsSatisfied();
-
-        String content = context.getTypeConverter().convertTo(String.class, new File("target/changed/out/slowfile.dat").getAbsoluteFile());
-        String[] lines = content.split(LS);
-        assertEquals("There should be 20 lines in the file", 20, lines.length);
-        for (int i = 0; i < 20; i++) {
-            assertEquals("Line " + i, lines[i]);
-        }
     }
 
-    private void writeSlowFile() throws Exception {
-        LOG.debug("Writing slow file...");
-
-        FileOutputStream fos = new FileOutputStream("target/changed/in/slowfile.dat");
-        for (int i = 0; i < 20; i++) {
-            fos.write(("Line " + i + LS).getBytes());
-            LOG.debug("Writing line " + i);
-            Thread.sleep(200);
-        }
-
+    private void writeZeroLengthFile() throws Exception {
+        FileOutputStream fos = new FileOutputStream("target/changed/in/zerofile.dat");
         fos.flush();
         fos.close();
-        LOG.debug("Writing slow file DONE...");
     }
 
     @Override
@@ -76,7 +55,7 @@ public class FileChangedReadLockTest ext
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("file:target/changed/in?readLock=changed").to("file:target/changed/out", "mock:result");
+                from("file:target/changed/in?readLock=changed&readLockMinLength=0").to("file:target/changed/out", "mock:result");
             }
         };
     }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java Thu Aug 16 08:27:47 2012
@@ -32,6 +32,7 @@ public class FtpChangedExclusiveReadLock
     private static final transient Logger LOG = LoggerFactory.getLogger(FtpChangedExclusiveReadLockStrategy.class);
     private long timeout;
     private long checkInterval = 5000;
+    private long minLength = 1;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<FTPFile> tGenericFileOperations, GenericFileEndpoint<FTPFile> tGenericFileEndpoint) throws Exception {
@@ -71,8 +72,7 @@ public class FtpChangedExclusiveReadLock
             LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
             LOG.trace("Previous length: " + length + ", new length: " + newLength);
 
-            if (newLastModified == lastModified && newLength == length && length != 0) {
-                // We consider that zero-length files are files in progress on some FTP servers
+            if (length >= minLength && (newLastModified == lastModified && newLength == length)) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -123,4 +123,11 @@ public class FtpChangedExclusiveReadLock
         this.checkInterval = checkInterval;
     }
 
+    public long getMinLength() {
+        return minLength;
+    }
+
+    public void setMinLength(long minLength) {
+        this.minLength = minLength;
+    }
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java Thu Aug 16 08:27:47 2012
@@ -113,7 +113,7 @@ public final class FtpProcessStrategyFac
                 }
                 return readLockStrategy;
             } else if ("changed".equals(readLock)) {
-                GenericFileExclusiveReadLockStrategy<FTPFile> readLockStrategy = new FtpChangedExclusiveReadLockStrategy();
+                FtpChangedExclusiveReadLockStrategy readLockStrategy = new FtpChangedExclusiveReadLockStrategy();
                 Long timeout = (Long) params.get("readLockTimeout");
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
@@ -122,6 +122,10 @@ public final class FtpProcessStrategyFac
                 if (checkInterval != null) {
                     readLockStrategy.setCheckInterval(checkInterval);
                 }
+                Long minLength = (Long) params.get("readLockMinLength");
+                if (minLength != null) {
+                    readLockStrategy.setMinLength(minLength);
+                }
                 return readLockStrategy;
             }
         }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java Thu Aug 16 08:27:47 2012
@@ -32,6 +32,7 @@ public class SftpChangedExclusiveReadLoc
     private static final transient Logger LOG = LoggerFactory.getLogger(SftpChangedExclusiveReadLockStrategy.class);
     private long timeout;
     private long checkInterval = 5000;
+    private long minLength = 1;
 
     @Override
     public void prepareOnStartup(GenericFileOperations<ChannelSftp.LsEntry> tGenericFileOperations, GenericFileEndpoint<ChannelSftp.LsEntry> tGenericFileEndpoint) throws Exception {
@@ -71,8 +72,7 @@ public class SftpChangedExclusiveReadLoc
             LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
             LOG.trace("Previous length: " + length + ", new length: " + newLength);
 
-            if (newLastModified == lastModified && newLength == length && length != 0) {
-                // We consider that zero-length files are files in progress on some FTP servers
+            if (length >= minLength && (newLastModified == lastModified && newLength == length)) {
                 LOG.trace("Read lock acquired.");
                 exclusive = true;
             } else {
@@ -123,4 +123,11 @@ public class SftpChangedExclusiveReadLoc
         this.checkInterval = checkInterval;
     }
 
+    public long getMinLength() {
+        return minLength;
+    }
+
+    public void setMinLength(long minLength) {
+        this.minLength = minLength;
+    }
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java?rev=1373751&r1=1373750&r2=1373751&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java Thu Aug 16 08:27:47 2012
@@ -18,6 +18,7 @@ package org.apache.camel.component.file.
 
 import java.util.Map;
 
+import com.jcraft.jsch.ChannelSftp;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Expression;
 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
@@ -35,7 +36,7 @@ public final class SftpProcessStrategyFa
     }
 
     @SuppressWarnings("unchecked")
-    public static <LsEntry> GenericFileProcessStrategy<LsEntry> createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) {
+    public static GenericFileProcessStrategy<ChannelSftp.LsEntry> createGenericFileProcessStrategy(CamelContext context, Map<String, Object> params) {
 
         // We assume a value is present only if its value not null for String and 'true' for boolean
         Expression moveExpression = (Expression) params.get("move");
@@ -46,52 +47,52 @@ public final class SftpProcessStrategyFa
         boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null;
 
         if (isDelete) {
-            GenericFileDeleteProcessStrategy<LsEntry> strategy = new GenericFileDeleteProcessStrategy<LsEntry>();
-            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<LsEntry>) getExclusiveReadLockStrategy(params));
+            GenericFileDeleteProcessStrategy<ChannelSftp.LsEntry> strategy = new GenericFileDeleteProcessStrategy<ChannelSftp.LsEntry>();
+            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry>) getExclusiveReadLockStrategy(params));
             if (preMoveExpression != null) {
-                GenericFileExpressionRenamer<LsEntry> renamer = new GenericFileExpressionRenamer<LsEntry>();
+                GenericFileExpressionRenamer<ChannelSftp.LsEntry> renamer = new GenericFileExpressionRenamer<ChannelSftp.LsEntry>();
                 renamer.setExpression(preMoveExpression);
                 strategy.setBeginRenamer(renamer);
             }
             if (moveFailedExpression != null) {
-                GenericFileExpressionRenamer<LsEntry> renamer = new GenericFileExpressionRenamer<LsEntry>();
+                GenericFileExpressionRenamer<ChannelSftp.LsEntry> renamer = new GenericFileExpressionRenamer<ChannelSftp.LsEntry>();
                 renamer.setExpression(moveFailedExpression);
                 strategy.setFailureRenamer(renamer);
             }
             return strategy;
         } else if (isMove || isNoop) {
-            GenericFileRenameProcessStrategy<LsEntry> strategy = new GenericFileRenameProcessStrategy<LsEntry>();
-            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<LsEntry>) getExclusiveReadLockStrategy(params));
+            GenericFileRenameProcessStrategy<ChannelSftp.LsEntry> strategy = new GenericFileRenameProcessStrategy<ChannelSftp.LsEntry>();
+            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry>) getExclusiveReadLockStrategy(params));
             if (!isNoop && moveExpression != null) {
                 // move on commit is only possible if not noop
-                GenericFileExpressionRenamer<LsEntry> renamer = new GenericFileExpressionRenamer<LsEntry>();
+                GenericFileExpressionRenamer<ChannelSftp.LsEntry> renamer = new GenericFileExpressionRenamer<ChannelSftp.LsEntry>();
                 renamer.setExpression(moveExpression);
                 strategy.setCommitRenamer(renamer);
             }
             // both move and noop supports pre move
             if (moveFailedExpression != null) {
-                GenericFileExpressionRenamer<LsEntry> renamer = new GenericFileExpressionRenamer<LsEntry>();
+                GenericFileExpressionRenamer<ChannelSftp.LsEntry> renamer = new GenericFileExpressionRenamer<ChannelSftp.LsEntry>();
                 renamer.setExpression(moveFailedExpression);
                 strategy.setFailureRenamer(renamer);
             }
             // both move and noop supports pre move
             if (preMoveExpression != null) {
-                GenericFileExpressionRenamer<LsEntry> renamer = new GenericFileExpressionRenamer<LsEntry>();
+                GenericFileExpressionRenamer<ChannelSftp.LsEntry> renamer = new GenericFileExpressionRenamer<ChannelSftp.LsEntry>();
                 renamer.setExpression(preMoveExpression);
                 strategy.setBeginRenamer(renamer);
             }
             return strategy;
         } else {
             // default strategy will do nothing
-            GenericFileNoOpProcessStrategy<LsEntry> strategy = new GenericFileNoOpProcessStrategy<LsEntry>();
-            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<LsEntry>) getExclusiveReadLockStrategy(params));
+            GenericFileNoOpProcessStrategy<ChannelSftp.LsEntry> strategy = new GenericFileNoOpProcessStrategy<ChannelSftp.LsEntry>();
+            strategy.setExclusiveReadLockStrategy((GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry>) getExclusiveReadLockStrategy(params));
             return strategy;
         }
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private static <LsEntry> GenericFileExclusiveReadLockStrategy<LsEntry> getExclusiveReadLockStrategy(Map<String, Object> params) {
-        GenericFileExclusiveReadLockStrategy<LsEntry> strategy = (GenericFileExclusiveReadLockStrategy<LsEntry>) params.get("exclusiveReadLockStrategy");
+    private static GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry> getExclusiveReadLockStrategy(Map<String, Object> params) {
+        GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry> strategy = (GenericFileExclusiveReadLockStrategy<ChannelSftp.LsEntry>) params.get("exclusiveReadLockStrategy");
         if (strategy != null) {
             return strategy;
         }
@@ -102,7 +103,7 @@ public final class SftpProcessStrategyFa
             if ("none".equals(readLock) || "false".equals(readLock)) {
                 return null;
             } else if ("rename".equals(readLock)) {
-                GenericFileRenameExclusiveReadLockStrategy<LsEntry> readLockStrategy = new GenericFileRenameExclusiveReadLockStrategy<LsEntry>();
+                GenericFileRenameExclusiveReadLockStrategy<ChannelSftp.LsEntry> readLockStrategy = new GenericFileRenameExclusiveReadLockStrategy<ChannelSftp.LsEntry>();
                 Long timeout = (Long) params.get("readLockTimeout");
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
@@ -113,7 +114,7 @@ public final class SftpProcessStrategyFa
                 }
                 return readLockStrategy;
             } else if ("changed".equals(readLock)) {
-                GenericFileExclusiveReadLockStrategy readLockStrategy = new SftpChangedExclusiveReadLockStrategy();
+                SftpChangedExclusiveReadLockStrategy readLockStrategy = new SftpChangedExclusiveReadLockStrategy();
                 Long timeout = (Long) params.get("readLockTimeout");
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
@@ -122,6 +123,10 @@ public final class SftpProcessStrategyFa
                 if (checkInterval != null) {
                     readLockStrategy.setCheckInterval(checkInterval);
                 }
+                Long minLength = (Long) params.get("readLockMinLength");
+                if (minLength != null) {
+                    readLockStrategy.setMinLength(minLength);
+                }
                 return readLockStrategy;
             }
         }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedZeroLengthReadLockTest.java (from r1373737, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedZeroLengthReadLockTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedZeroLengthReadLockTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockTest.java&r1=1373737&r2=1373751&rev=1373751&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedReadLockTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpChangedZeroLengthReadLockTest.java Thu Aug 16 08:27:47 2012
@@ -16,58 +16,37 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.File;
 import java.io.FileOutputStream;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  *
  */
-public class FtpChangedReadLockTest extends FtpServerTestSupport {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(FtpChangedReadLockTest.class);
+public class FtpChangedZeroLengthReadLockTest extends FtpServerTestSupport {
 
     protected String getFtpUrl() {
-        return "ftp://admin@localhost:" + getPort() + "/changed?password=admin&readLock=changed&readLockCheckInterval=1000&delete=true";
+        return "ftp://admin@localhost:" + getPort() + "/changed?password=admin&readLock=changed&readLockCheckInterval=1000&readLockMinLength=0&delete=true";
     }
 
     @Test
     public void testChangedReadLock() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedFileExists("target/changed/out/slowfile.dat");
+        mock.expectedFileExists("target/changed/out/zerofile.dat");
 
-        writeSlowFile();
+        writeZeroFile();
 
         assertMockEndpointsSatisfied();
-
-        String content = context.getTypeConverter().convertTo(String.class, new File("target/changed/out/slowfile.dat").getAbsoluteFile());
-        String[] lines = content.split(LS);
-        assertEquals("There should be 20 lines in the file", 20, lines.length);
-        for (int i = 0; i < 20; i++) {
-            assertEquals("Line " + i, lines[i]);
-        }
     }
 
-    private void writeSlowFile() throws Exception {
-        LOG.debug("Writing slow file...");
-
+    private void writeZeroFile() throws Exception {
         createDirectory(FTP_ROOT_DIR + "/changed");
-        FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + "/changed/slowfile.dat", true);
-        for (int i = 0; i < 20; i++) {
-            fos.write(("Line " + i + LS).getBytes());
-            LOG.debug("Writing line " + i);
-            Thread.sleep(200);
-        }
-
+        FileOutputStream fos = new FileOutputStream(FTP_ROOT_DIR + "/changed/zerofile.dat", true);
         fos.flush();
         fos.close();
-        LOG.debug("Writing slow file DONE...");
     }
 
     @Override