You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/02 21:40:23 UTC

svn commit: r1345562 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/file/strategy/ camel-core/src/test/java/org/apache/camel/component/file/ components/camel-ftp/src/main...

Author: davsclaus
Date: Sat Jun  2 19:40:22 2012
New Revision: 1345562

URL: http://svn.apache.org/viewvc?rev=1345562&view=rev
Log:
CAMEL-5324: Improved file component to use marker file in read locks to support clustered Camel apps, with multiple file consumers competing for files on a shared drive.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java
      - copied, changed from r1345498, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Jun  2 19:40:22 2012
@@ -285,9 +285,14 @@ public abstract class GenericFileConsume
 
             boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
             if (!begin) {
-                log.debug(endpoint + " cannot begin processing file: {}", file);
-                // begin returned false, so remove file from the in progress list as its no longer in progress
-                endpoint.getInProgressRepository().remove(absoluteFileName);
+                log.debug("{} cannot begin processing file: {}", endpoint, file);
+                try {
+                    // abort
+                    processStrategy.abort(operations, endpoint, exchange, file);
+                } finally {
+                    // begin returned false, so remove file from the in progress list as its no longer in progress
+                    endpoint.getInProgressRepository().remove(absoluteFileName);
+                }
                 return;
             }
         } catch (Exception e) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java Sat Jun  2 19:40:22 2012
@@ -49,6 +49,20 @@ public interface GenericFileProcessStrat
                   Exchange exchange, GenericFile<T> file) throws Exception;
 
     /**
+     * Called when a begin is aborted, for example to release any resources which may have
+     * been acquired during the {@link #begin(GenericFileOperations, GenericFileEndpoint, org.apache.camel.Exchange, GenericFile)}
+     * operation.
+     *
+     * @param operations file operations
+     * @param endpoint   the endpoint
+     * @param exchange   the exchange
+     * @param file       the file
+     * @throws Exception can be thrown in case of errors
+     */
+    void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint,
+               Exchange exchange, GenericFile<T> file) throws Exception;
+
+    /**
      * Releases any file locks and possibly deletes or moves the file after
      * successful processing
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java Sat Jun  2 19:40:22 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.file.GenericFile;
-import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
@@ -36,69 +35,54 @@ public class FileChangedExclusiveReadLoc
     private long timeout;
     private long checkInterval = 1000;
 
-    @Override
-    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
-        // noop
-    }
-
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+        // must call super
+        if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
+            System.out.println("XXXX");
+            return false;
+        }
+
         File target = new File(file.getAbsoluteFilePath());
         boolean exclusive = false;
 
         LOG.trace("Waiting for exclusive read lock to file: {}", file);
 
-        try {
-            long lastModified = Long.MIN_VALUE;
-            long length = Long.MIN_VALUE;
-            StopWatch watch = new StopWatch();
-
-            while (!exclusive) {
-                // timeout check
-                if (timeout > 0) {
-                    long delta = watch.taken();
-                    if (delta > timeout) {
-                        LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
-                        // we could not get the lock within the timeout period, so return false
-                        return false;
-                    }
+        long lastModified = Long.MIN_VALUE;
+        long length = Long.MIN_VALUE;
+        StopWatch watch = new StopWatch();
+
+        while (!exclusive) {
+            // timeout check
+            if (timeout > 0) {
+                long delta = watch.taken();
+                if (delta > timeout) {
+                    LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+                    // we could not get the lock within the timeout period, so return false
+                    return false;
                 }
+            }
 
-                long newLastModified = target.lastModified();
-                long newLength = target.length();
+            long newLastModified = target.lastModified();
+            long newLength = target.length();
 
-                LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified);
-                LOG.trace("Previous length: {}, new length: {}", length, newLength);
+            LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified);
+            LOG.trace("Previous length: {}, new length: {}", length, newLength);
 
-                if (newLastModified == lastModified && newLength == length) {
-                    // let super handle the last part of acquiring the lock now the file is not
-                    // currently being in progress of being copied as file length and modified
-                    // are stable
-                    exclusive = super.acquireExclusiveReadLock(operations, file, exchange);
-                } else {
-                    // set new base file change information
-                    lastModified = newLastModified;
-                    length = newLength;
-
-                    boolean interrupted = sleep();
-                    if (interrupted) {
-                        // we were interrupted while sleeping, we are likely being shutdown so return false
-                        return false;
-                    }
+            if (newLastModified == lastModified && newLength == length && length != 0) {
+                // We consider that zero-length files are files in progress
+                LOG.trace("Read lock acquired.");
+                exclusive = true;
+            } else {
+                // set new base file change information
+                lastModified = newLastModified;
+                length = newLength;
+
+                boolean interrupted = sleep();
+                if (interrupted) {
+                    // we were interrupted while sleeping, we are likely being shutdown so return false
+                    return false;
                 }
             }
-        } catch (IOException e) {
-            // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
-            // such as AntiVirus or MS Office that has special locks for it's supported files
-            if (timeout == 0) {
-                // if not using timeout, then we cant retry, so rethrow
-                throw e;
-            }
-            LOG.debug("Cannot acquire read lock. Will try again.", e);
-            boolean interrupted = sleep();
-            if (interrupted) {
-                // we were interrupted while sleeping, we are likely being shutdown so return false
-                return false;
-            }
         }
 
         return exclusive;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java Sat Jun  2 19:40:22 2012
@@ -26,7 +26,6 @@ import java.nio.channels.FileLock;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
-import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * After granting the read lock it is released, we just want to make sure that when we start
  * consuming the file its not currently in progress of being written by third party.
  */
-public class FileLockExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
+public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy {
     private static final transient Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class);
     private long timeout;
     private long checkInterval = 1000;
@@ -50,6 +49,11 @@ public class FileLockExclusiveReadLockSt
     }
 
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+        // must call super
+        if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
+            return false;
+        }
+
         File target = new File(file.getAbsoluteFilePath());
 
         LOG.trace("Waiting for exclusive read lock to file: {}", target);
@@ -110,6 +114,10 @@ public class FileLockExclusiveReadLockSt
 
     public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
                                          GenericFile<File> file, Exchange exchange) throws Exception {
+
+        // must call super
+        super.releaseExclusiveReadLock(operations, file, exchange);
+
         if (lock != null) {
             Channel channel = lock.channel();
             try {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Sat Jun  2 19:40:22 2012
@@ -121,7 +121,7 @@ public final class FileProcessStrategyFa
                 }
                 return readLockStrategy;
             } else if ("rename".equals(readLock)) {
-                GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new GenericFileRenameExclusiveReadLockStrategy<File>();
+                GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new FileRenameExclusiveReadLockStrategy();
                 Long timeout = (Long) params.get("readLockTimeout");
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java?rev=1345562&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java Sat Jun  2 19:40:22 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileOperations;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
+ * After granting the read lock it is released, we just want to make sure that when we start
+ * consuming the file its not currently in progress of being written by third party.
+ * <p/>
+ * This implementation is only supported by the File component, that leverages the {@link MarkerFileExclusiveReadLockStrategy}
+ * as well, to ensure only acquiring locks on files, which is not already in progress by another process,
+ * that have marked this using the marker file.
+ */
+public class FileRenameExclusiveReadLockStrategy extends GenericFileRenameExclusiveReadLockStrategy<File> {
+
+    private MarkerFileExclusiveReadLockStrategy marker = new MarkerFileExclusiveReadLockStrategy();
+
+    @Override
+    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+        // must call marker first
+        if (!marker.acquireExclusiveReadLock(operations, file, exchange)) {
+            return false;
+        }
+
+        return super.acquireExclusiveReadLock(operations, file, exchange);
+    }
+
+    @Override
+    public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+                                         GenericFile<File> file, Exchange exchange) throws Exception {
+        // must call marker first
+        try {
+            marker.releaseExclusiveReadLock(operations, file, exchange);
+        } finally {
+            super.releaseExclusiveReadLock(operations, file, exchange);
+        }
+    }
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Sat Jun  2 19:40:22 2012
@@ -30,6 +30,9 @@ import org.apache.camel.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Base class for implementations of {@link GenericFileProcessStrategy}.
+ */
 public abstract class GenericFileProcessStrategySupport<T> implements GenericFileProcessStrategy<T> {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
     protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@@ -53,6 +56,14 @@ public abstract class GenericFileProcess
         return true;
     }
 
+    public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+        if (exclusiveReadLockStrategy != null) {
+            exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+        }
+
+        deleteLocalWorkFile(exchange);
+    }
+
     public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java Sat Jun  2 19:40:22 2012
@@ -83,6 +83,11 @@ public class FileBeginFailureOneTimeTest
             return true;
         }
 
+        public void abort(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint,
+                          Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+            // noop
+        }
+
         public void commit(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint,
                             Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
         }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java (from r1345498, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java&r1=1345498&r2=1345562&rev=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java Sat Jun  2 19:40:22 2012
@@ -16,25 +16,27 @@
  */
 package org.apache.camel.component.file;
 
+import java.io.File;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version 
  */
-public class FileMarkerFileDeleteOldLockFilesTest extends ContextTestSupport {
+public class FileRenameReadLockMustUseMarkerFileTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        deleteDirectory("target/oldlock");
-        template.sendBodyAndHeader("file:target/oldlock", "locked", Exchange.FILE_NAME, "hello.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
-        template.sendBodyAndHeader("file:target/oldlock", "Bye World", Exchange.FILE_NAME, "bye.txt");
+        deleteDirectory("target/rename");
+        template.sendBodyAndHeader("file:target/rename", "Bye World", Exchange.FILE_NAME, "bye.txt");
     }
 
-    public void testDeleteOldLockOnStartup() throws Exception {
+    public void testCamelLockFile() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         mock.expectedBodiesReceived("Bye World");
@@ -44,6 +46,12 @@ public class FileMarkerFileDeleteOldLock
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
+
+        assertTrue(oneExchangeDone.matchesMockWaitTime());
+
+        // and lock file should be deleted
+        File lock = new File("target/rename/bye.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+        assertFalse("Lock file should not exist: " + lock, lock.exists());
     }
 
     @Override
@@ -51,7 +59,18 @@ public class FileMarkerFileDeleteOldLock
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("file:target/oldlock").routeId("foo").noAutoStartup()
+                from("file:target/rename?readLock=rename").routeId("foo").noAutoStartup()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                // got a file, so we should have a .camelLock file as well
+                                String name = exchange.getIn().getHeader(Exchange.FILE_PATH) + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+                                File lock = new File(name);
+
+                                // lock file should exist
+                                assertTrue("Lock file should exist: " + name, lock.exists());
+                            }
+                        })
                         .convertBodyTo(String.class).to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java Sat Jun  2 19:40:22 2012
@@ -43,64 +43,49 @@ public class FtpChangedExclusiveReadLock
 
         LOG.trace("Waiting for exclusive read lock to file: " + file);
 
-        try {
-            long lastModified = Long.MIN_VALUE;
-            long length = Long.MIN_VALUE;
-            StopWatch watch = new StopWatch();
-
-            while (!exclusive) {
-                // timeout check
-                if (timeout > 0) {
-                    long delta = watch.taken();
-                    if (delta > timeout) {
-                        LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
-                        // we could not get the lock within the timeout period, so return false
-                        return false;
-                    }
+        long lastModified = Long.MIN_VALUE;
+        long length = Long.MIN_VALUE;
+        StopWatch watch = new StopWatch();
+
+        while (!exclusive) {
+            // timeout check
+            if (timeout > 0) {
+                long delta = watch.taken();
+                if (delta > timeout) {
+                    LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+                    // we could not get the lock within the timeout period, so return false
+                    return false;
                 }
+            }
 
-                long newLastModified = 0;
-                long newLength = 0;
-                List<FTPFile> files = operations.listFiles(file.getParent());
-                for (FTPFile f : files) {
-                    if (f.getName().equals(file.getFileName())) {
-                        newLastModified = f.getTimestamp().getTimeInMillis();
-                        newLength = f.getSize();
-                    }
+            long newLastModified = 0;
+            long newLength = 0;
+            List<FTPFile> files = operations.listFiles(file.getParent());
+            for (FTPFile f : files) {
+                if (f.getName().equals(file.getFileName())) {
+                    newLastModified = f.getTimestamp().getTimeInMillis();
+                    newLength = f.getSize();
                 }
+            }
 
-                LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
-                LOG.trace("Previous length: " + length + ", new length: " + newLength);
+            LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
+            LOG.trace("Previous length: " + length + ", new length: " + newLength);
 
-                if (newLastModified == lastModified && newLength == length && length != 0) {
-                    // We consider that zero-length files are files in progress on some FTP servers
-                    LOG.trace("Read lock acquired.");
-                    exclusive = true;
-                } else {
-                    // set new base file change information
-                    lastModified = newLastModified;
-                    length = newLength;
-
-                    boolean interrupted = sleep();
-                    if (interrupted) {
-                        // we were interrupted while sleeping, we are likely being shutdown so return false
-                        return false;
-                    }
+            if (newLastModified == lastModified && newLength == length && length != 0) {
+                // We consider that zero-length files are files in progress on some FTP servers
+                LOG.trace("Read lock acquired.");
+                exclusive = true;
+            } else {
+                // set new base file change information
+                lastModified = newLastModified;
+                length = newLength;
+
+                boolean interrupted = sleep();
+                if (interrupted) {
+                    // we were interrupted while sleeping, we are likely being shutdown so return false
+                    return false;
                 }
             }
-        } catch (Exception e) {
-            // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
-            // such as AntiVirus or MS Office that has special locks for it's supported files
-            if (timeout == 0) {
-                // if not using timeout, then we cant retry, so rethrow
-                throw e;
-            }
-            LOG.debug("Cannot acquire read lock. Will try again.", e);
-            boolean interrupted = sleep();
-            if (interrupted) {
-                // we were interrupted while sleeping, we are likely being shutdown so return false
-                return false;
-            }
         }
 
         return exclusive;

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java Sat Jun  2 19:40:22 2012
@@ -107,6 +107,10 @@ public final class FtpProcessStrategyFac
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
                 }
+                Long checkInterval = (Long) params.get("readLockCheckInterval");
+                if (checkInterval != null) {
+                    readLockStrategy.setCheckInterval(checkInterval);
+                }
                 return readLockStrategy;
             } else if ("changed".equals(readLock)) {
                 GenericFileExclusiveReadLockStrategy<FTPFile> readLockStrategy = new FtpChangedExclusiveReadLockStrategy();

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java Sat Jun  2 19:40:22 2012
@@ -43,64 +43,49 @@ public class SftpChangedExclusiveReadLoc
 
         LOG.trace("Waiting for exclusive read lock to file: " + file);
 
-        try {
-            long lastModified = Long.MIN_VALUE;
-            long length = Long.MIN_VALUE;
-            StopWatch watch = new StopWatch();
-
-            while (!exclusive) {
-                // timeout check
-                if (timeout > 0) {
-                    long delta = watch.taken();
-                    if (delta > timeout) {
-                        LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
-                        // we could not get the lock within the timeout period, so return false
-                        return false;
-                    }
+        long lastModified = Long.MIN_VALUE;
+        long length = Long.MIN_VALUE;
+        StopWatch watch = new StopWatch();
+
+        while (!exclusive) {
+            // timeout check
+            if (timeout > 0) {
+                long delta = watch.taken();
+                if (delta > timeout) {
+                    LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+                    // we could not get the lock within the timeout period, so return false
+                    return false;
                 }
+            }
 
-                long newLastModified = 0;
-                long newLength = 0;
-                List<ChannelSftp.LsEntry> files = operations.listFiles(file.getParent());
-                for (ChannelSftp.LsEntry f : files) {
-                    if (f.getFilename().equals(file.getFileName())) {
-                        newLastModified = f.getAttrs().getMTime();
-                        newLength = f.getAttrs().getSize();
-                    }
+            long newLastModified = 0;
+            long newLength = 0;
+            List<ChannelSftp.LsEntry> files = operations.listFiles(file.getParent());
+            for (ChannelSftp.LsEntry f : files) {
+                if (f.getFilename().equals(file.getFileName())) {
+                    newLastModified = f.getAttrs().getMTime();
+                    newLength = f.getAttrs().getSize();
                 }
+            }
 
-                LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
-                LOG.trace("Previous length: " + length + ", new length: " + newLength);
+            LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
+            LOG.trace("Previous length: " + length + ", new length: " + newLength);
 
-                if (newLastModified == lastModified && newLength == length && length != 0) {
-                    // We consider that zero-length files are files in progress on some FTP servers
-                    LOG.trace("Read lock acquired.");
-                    exclusive = true;
-                } else {
-                    // set new base file change information
-                    lastModified = newLastModified;
-                    length = newLength;
-
-                    boolean interrupted = sleep();
-                    if (interrupted) {
-                        // we were interrupted while sleeping, we are likely being shutdown so return false
-                        return false;
-                    }
+            if (newLastModified == lastModified && newLength == length && length != 0) {
+                // We consider that zero-length files are files in progress on some FTP servers
+                LOG.trace("Read lock acquired.");
+                exclusive = true;
+            } else {
+                // set new base file change information
+                lastModified = newLastModified;
+                length = newLength;
+
+                boolean interrupted = sleep();
+                if (interrupted) {
+                    // we were interrupted while sleeping, we are likely being shutdown so return false
+                    return false;
                 }
             }
-        } catch (Exception e) {
-            // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
-            // such as AntiVirus or MS Office that has special locks for it's supported files
-            if (timeout == 0) {
-                // if not using timeout, then we cant retry, so rethrow
-                throw e;
-            }
-            LOG.debug("Cannot acquire read lock. Will try again.", e);
-            boolean interrupted = sleep();
-            if (interrupted) {
-                // we were interrupted while sleeping, we are likely being shutdown so return false
-                return false;
-            }
         }
 
         return exclusive;

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java Sat Jun  2 19:40:22 2012
@@ -107,6 +107,10 @@ public final class SftpProcessStrategyFa
                 if (timeout != null) {
                     readLockStrategy.setTimeout(timeout);
                 }
+                Long checkInterval = (Long) params.get("readLockCheckInterval");
+                if (checkInterval != null) {
+                    readLockStrategy.setCheckInterval(checkInterval);
+                }
                 return readLockStrategy;
             } else if ("changed".equals(readLock)) {
                 GenericFileExclusiveReadLockStrategy readLockStrategy = new SftpChangedExclusiveReadLockStrategy();