You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/02 21:40:23 UTC
svn commit: r1345562 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/component/file/strategy/
camel-core/src/test/java/org/apache/camel/component/file/
components/camel-ftp/src/main...
Author: davsclaus
Date: Sat Jun 2 19:40:22 2012
New Revision: 1345562
URL: http://svn.apache.org/viewvc?rev=1345562&view=rev
Log:
CAMEL-5324: Improved file component to use marker file in read locks to support clustered Camel apps, with multiple file consumers competing for files on a shared drive.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java
- copied, changed from r1345498, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Sat Jun 2 19:40:22 2012
@@ -285,9 +285,14 @@ public abstract class GenericFileConsume
boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
if (!begin) {
- log.debug(endpoint + " cannot begin processing file: {}", file);
- // begin returned false, so remove file from the in progress list as its no longer in progress
- endpoint.getInProgressRepository().remove(absoluteFileName);
+ log.debug("{} cannot begin processing file: {}", endpoint, file);
+ try {
+ // abort
+ processStrategy.abort(operations, endpoint, exchange, file);
+ } finally {
+ // begin returned false, so remove file from the in progress list as its no longer in progress
+ endpoint.getInProgressRepository().remove(absoluteFileName);
+ }
return;
}
} catch (Exception e) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProcessStrategy.java Sat Jun 2 19:40:22 2012
@@ -49,6 +49,20 @@ public interface GenericFileProcessStrat
Exchange exchange, GenericFile<T> file) throws Exception;
/**
+ * Called when a begin is aborted, for example to release any resources which may have
+ * been acquired during the {@link #begin(GenericFileOperations, GenericFileEndpoint, org.apache.camel.Exchange, GenericFile)}
+ * operation.
+ *
+ * @param operations file operations
+ * @param endpoint the endpoint
+ * @param exchange the exchange
+ * @param file the file
+ * @throws Exception can be thrown in case of errors
+ */
+ void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint,
+ Exchange exchange, GenericFile<T> file) throws Exception;
+
+ /**
* Releases any file locks and possibly deletes or moves the file after
* successful processing
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java Sat Jun 2 19:40:22 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
-import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
@@ -36,69 +35,54 @@ public class FileChangedExclusiveReadLoc
private long timeout;
private long checkInterval = 1000;
- @Override
- public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
- // noop
- }
-
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call super
+ if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
+ System.out.println("XXXX");
+ return false;
+ }
+
File target = new File(file.getAbsoluteFilePath());
boolean exclusive = false;
LOG.trace("Waiting for exclusive read lock to file: {}", file);
- try {
- long lastModified = Long.MIN_VALUE;
- long length = Long.MIN_VALUE;
- StopWatch watch = new StopWatch();
-
- while (!exclusive) {
- // timeout check
- if (timeout > 0) {
- long delta = watch.taken();
- if (delta > timeout) {
- LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
- // we could not get the lock within the timeout period, so return false
- return false;
- }
+ long lastModified = Long.MIN_VALUE;
+ long length = Long.MIN_VALUE;
+ StopWatch watch = new StopWatch();
+
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = watch.taken();
+ if (delta > timeout) {
+ LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout period, so return false
+ return false;
}
+ }
- long newLastModified = target.lastModified();
- long newLength = target.length();
+ long newLastModified = target.lastModified();
+ long newLength = target.length();
- LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified);
- LOG.trace("Previous length: {}, new length: {}", length, newLength);
+ LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified);
+ LOG.trace("Previous length: {}, new length: {}", length, newLength);
- if (newLastModified == lastModified && newLength == length) {
- // let super handle the last part of acquiring the lock now the file is not
- // currently being in progress of being copied as file length and modified
- // are stable
- exclusive = super.acquireExclusiveReadLock(operations, file, exchange);
- } else {
- // set new base file change information
- lastModified = newLastModified;
- length = newLength;
-
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
+ if (newLastModified == lastModified && newLength == length && length != 0) {
+ // We consider that zero-length files are files in progress
+ LOG.trace("Read lock acquired.");
+ exclusive = true;
+ } else {
+ // set new base file change information
+ lastModified = newLastModified;
+ length = newLength;
+
+ boolean interrupted = sleep();
+ if (interrupted) {
+ // we were interrupted while sleeping, we are likely being shutdown so return false
+ return false;
}
}
- } catch (IOException e) {
- // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
- // such as AntiVirus or MS Office that has special locks for it's supported files
- if (timeout == 0) {
- // if not using timeout, then we cant retry, so rethrow
- throw e;
- }
- LOG.debug("Cannot acquire read lock. Will try again.", e);
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
}
return exclusive;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java Sat Jun 2 19:40:22 2012
@@ -26,7 +26,6 @@ import java.nio.channels.FileLock;
import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
-import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
* After granting the read lock it is released, we just want to make sure that when we start
* consuming the file its not currently in progress of being written by third party.
*/
-public class FileLockExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
+public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy {
private static final transient Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class);
private long timeout;
private long checkInterval = 1000;
@@ -50,6 +49,11 @@ public class FileLockExclusiveReadLockSt
}
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call super
+ if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
+ return false;
+ }
+
File target = new File(file.getAbsoluteFilePath());
LOG.trace("Waiting for exclusive read lock to file: {}", target);
@@ -110,6 +114,10 @@ public class FileLockExclusiveReadLockSt
public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
+
+ // must call super
+ super.releaseExclusiveReadLock(operations, file, exchange);
+
if (lock != null) {
Channel channel = lock.channel();
try {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java Sat Jun 2 19:40:22 2012
@@ -121,7 +121,7 @@ public final class FileProcessStrategyFa
}
return readLockStrategy;
} else if ("rename".equals(readLock)) {
- GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new GenericFileRenameExclusiveReadLockStrategy<File>();
+ GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new FileRenameExclusiveReadLockStrategy();
Long timeout = (Long) params.get("readLockTimeout");
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java?rev=1345562&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java Sat Jun 2 19:40:22 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileOperations;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
+ * After granting the read lock it is released, we just want to make sure that when we start
+ * consuming the file its not currently in progress of being written by third party.
+ * <p/>
+ * This implementation is only supported by the File component, that leverages the {@link MarkerFileExclusiveReadLockStrategy}
+ * as well, to ensure only acquiring locks on files, which is not already in progress by another process,
+ * that have marked this using the marker file.
+ */
+public class FileRenameExclusiveReadLockStrategy extends GenericFileRenameExclusiveReadLockStrategy<File> {
+
+ private MarkerFileExclusiveReadLockStrategy marker = new MarkerFileExclusiveReadLockStrategy();
+
+ @Override
+ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call marker first
+ if (!marker.acquireExclusiveReadLock(operations, file, exchange)) {
+ return false;
+ }
+
+ return super.acquireExclusiveReadLock(operations, file, exchange);
+ }
+
+ @Override
+ public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
+ GenericFile<File> file, Exchange exchange) throws Exception {
+ // must call marker first
+ try {
+ marker.releaseExclusiveReadLock(operations, file, exchange);
+ } finally {
+ super.releaseExclusiveReadLock(operations, file, exchange);
+ }
+ }
+
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Sat Jun 2 19:40:22 2012
@@ -30,6 +30,9 @@ import org.apache.camel.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Base class for implementations of {@link GenericFileProcessStrategy}.
+ */
public abstract class GenericFileProcessStrategySupport<T> implements GenericFileProcessStrategy<T> {
protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
@@ -53,6 +56,14 @@ public abstract class GenericFileProcess
return true;
}
+ public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
+ if (exclusiveReadLockStrategy != null) {
+ exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
+ }
+
+ deleteLocalWorkFile(exchange);
+ }
+
public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileBeginFailureOneTimeTest.java Sat Jun 2 19:40:22 2012
@@ -83,6 +83,11 @@ public class FileBeginFailureOneTimeTest
return true;
}
+ public void abort(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint,
+ Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
+ // noop
+ }
+
public void commit(GenericFileOperations<File> fileGenericFileOperations, GenericFileEndpoint<File> fileGenericFileEndpoint,
Exchange exchange, GenericFile<File> fileGenericFile) throws Exception {
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java (from r1345498, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java&r1=1345498&r2=1345562&rev=1345562&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileMarkerFileDeleteOldLockFilesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameReadLockMustUseMarkerFileTest.java Sat Jun 2 19:40:22 2012
@@ -16,25 +16,27 @@
*/
package org.apache.camel.component.file;
+import java.io.File;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
/**
* @version
*/
-public class FileMarkerFileDeleteOldLockFilesTest extends ContextTestSupport {
+public class FileRenameReadLockMustUseMarkerFileTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
super.setUp();
- deleteDirectory("target/oldlock");
- template.sendBodyAndHeader("file:target/oldlock", "locked", Exchange.FILE_NAME, "hello.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
- template.sendBodyAndHeader("file:target/oldlock", "Bye World", Exchange.FILE_NAME, "bye.txt");
+ deleteDirectory("target/rename");
+ template.sendBodyAndHeader("file:target/rename", "Bye World", Exchange.FILE_NAME, "bye.txt");
}
- public void testDeleteOldLockOnStartup() throws Exception {
+ public void testCamelLockFile() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
mock.expectedBodiesReceived("Bye World");
@@ -44,6 +46,12 @@ public class FileMarkerFileDeleteOldLock
context.startRoute("foo");
assertMockEndpointsSatisfied();
+
+ assertTrue(oneExchangeDone.matchesMockWaitTime());
+
+ // and lock file should be deleted
+ File lock = new File("target/rename/bye.txt" + FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
+ assertFalse("Lock file should not exist: " + lock, lock.exists());
}
@Override
@@ -51,7 +59,18 @@ public class FileMarkerFileDeleteOldLock
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("file:target/oldlock").routeId("foo").noAutoStartup()
+ from("file:target/rename?readLock=rename").routeId("foo").noAutoStartup()
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // got a file, so we should have a .camelLock file as well
+ String name = exchange.getIn().getHeader(Exchange.FILE_PATH) + FileComponent.DEFAULT_LOCK_FILE_POSTFIX;
+ File lock = new File(name);
+
+ // lock file should exist
+ assertTrue("Lock file should exist: " + name, lock.exists());
+ }
+ })
.convertBodyTo(String.class).to("mock:result");
}
};
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java Sat Jun 2 19:40:22 2012
@@ -43,64 +43,49 @@ public class FtpChangedExclusiveReadLock
LOG.trace("Waiting for exclusive read lock to file: " + file);
- try {
- long lastModified = Long.MIN_VALUE;
- long length = Long.MIN_VALUE;
- StopWatch watch = new StopWatch();
-
- while (!exclusive) {
- // timeout check
- if (timeout > 0) {
- long delta = watch.taken();
- if (delta > timeout) {
- LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
- // we could not get the lock within the timeout period, so return false
- return false;
- }
+ long lastModified = Long.MIN_VALUE;
+ long length = Long.MIN_VALUE;
+ StopWatch watch = new StopWatch();
+
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = watch.taken();
+ if (delta > timeout) {
+ LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout period, so return false
+ return false;
}
+ }
- long newLastModified = 0;
- long newLength = 0;
- List<FTPFile> files = operations.listFiles(file.getParent());
- for (FTPFile f : files) {
- if (f.getName().equals(file.getFileName())) {
- newLastModified = f.getTimestamp().getTimeInMillis();
- newLength = f.getSize();
- }
+ long newLastModified = 0;
+ long newLength = 0;
+ List<FTPFile> files = operations.listFiles(file.getParent());
+ for (FTPFile f : files) {
+ if (f.getName().equals(file.getFileName())) {
+ newLastModified = f.getTimestamp().getTimeInMillis();
+ newLength = f.getSize();
}
+ }
- LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
- LOG.trace("Previous length: " + length + ", new length: " + newLength);
+ LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
+ LOG.trace("Previous length: " + length + ", new length: " + newLength);
- if (newLastModified == lastModified && newLength == length && length != 0) {
- // We consider that zero-length files are files in progress on some FTP servers
- LOG.trace("Read lock acquired.");
- exclusive = true;
- } else {
- // set new base file change information
- lastModified = newLastModified;
- length = newLength;
-
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
+ if (newLastModified == lastModified && newLength == length && length != 0) {
+ // We consider that zero-length files are files in progress on some FTP servers
+ LOG.trace("Read lock acquired.");
+ exclusive = true;
+ } else {
+ // set new base file change information
+ lastModified = newLastModified;
+ length = newLength;
+
+ boolean interrupted = sleep();
+ if (interrupted) {
+ // we were interrupted while sleeping, we are likely being shutdown so return false
+ return false;
}
}
- } catch (Exception e) {
- // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
- // such as AntiVirus or MS Office that has special locks for it's supported files
- if (timeout == 0) {
- // if not using timeout, then we cant retry, so rethrow
- throw e;
- }
- LOG.debug("Cannot acquire read lock. Will try again.", e);
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
}
return exclusive;
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpProcessStrategyFactory.java Sat Jun 2 19:40:22 2012
@@ -107,6 +107,10 @@ public final class FtpProcessStrategyFac
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
}
+ Long checkInterval = (Long) params.get("readLockCheckInterval");
+ if (checkInterval != null) {
+ readLockStrategy.setCheckInterval(checkInterval);
+ }
return readLockStrategy;
} else if ("changed".equals(readLock)) {
GenericFileExclusiveReadLockStrategy<FTPFile> readLockStrategy = new FtpChangedExclusiveReadLockStrategy();
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpChangedExclusiveReadLockStrategy.java Sat Jun 2 19:40:22 2012
@@ -43,64 +43,49 @@ public class SftpChangedExclusiveReadLoc
LOG.trace("Waiting for exclusive read lock to file: " + file);
- try {
- long lastModified = Long.MIN_VALUE;
- long length = Long.MIN_VALUE;
- StopWatch watch = new StopWatch();
-
- while (!exclusive) {
- // timeout check
- if (timeout > 0) {
- long delta = watch.taken();
- if (delta > timeout) {
- LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
- // we could not get the lock within the timeout period, so return false
- return false;
- }
+ long lastModified = Long.MIN_VALUE;
+ long length = Long.MIN_VALUE;
+ StopWatch watch = new StopWatch();
+
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = watch.taken();
+ if (delta > timeout) {
+ LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout period, so return false
+ return false;
}
+ }
- long newLastModified = 0;
- long newLength = 0;
- List<ChannelSftp.LsEntry> files = operations.listFiles(file.getParent());
- for (ChannelSftp.LsEntry f : files) {
- if (f.getFilename().equals(file.getFileName())) {
- newLastModified = f.getAttrs().getMTime();
- newLength = f.getAttrs().getSize();
- }
+ long newLastModified = 0;
+ long newLength = 0;
+ List<ChannelSftp.LsEntry> files = operations.listFiles(file.getParent());
+ for (ChannelSftp.LsEntry f : files) {
+ if (f.getFilename().equals(file.getFileName())) {
+ newLastModified = f.getAttrs().getMTime();
+ newLength = f.getAttrs().getSize();
}
+ }
- LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
- LOG.trace("Previous length: " + length + ", new length: " + newLength);
+ LOG.trace("Previous last modified: " + lastModified + ", new last modified: " + newLastModified);
+ LOG.trace("Previous length: " + length + ", new length: " + newLength);
- if (newLastModified == lastModified && newLength == length && length != 0) {
- // We consider that zero-length files are files in progress on some FTP servers
- LOG.trace("Read lock acquired.");
- exclusive = true;
- } else {
- // set new base file change information
- lastModified = newLastModified;
- length = newLength;
-
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
+ if (newLastModified == lastModified && newLength == length && length != 0) {
+ // We consider that zero-length files are files in progress on some FTP servers
+ LOG.trace("Read lock acquired.");
+ exclusive = true;
+ } else {
+ // set new base file change information
+ lastModified = newLastModified;
+ length = newLength;
+
+ boolean interrupted = sleep();
+ if (interrupted) {
+ // we were interrupted while sleeping, we are likely being shutdown so return false
+ return false;
}
}
- } catch (Exception e) {
- // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
- // such as AntiVirus or MS Office that has special locks for it's supported files
- if (timeout == 0) {
- // if not using timeout, then we cant retry, so rethrow
- throw e;
- }
- LOG.debug("Cannot acquire read lock. Will try again.", e);
- boolean interrupted = sleep();
- if (interrupted) {
- // we were interrupted while sleeping, we are likely being shutdown so return false
- return false;
- }
}
return exclusive;
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java?rev=1345562&r1=1345561&r2=1345562&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/SftpProcessStrategyFactory.java Sat Jun 2 19:40:22 2012
@@ -107,6 +107,10 @@ public final class SftpProcessStrategyFa
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
}
+ Long checkInterval = (Long) params.get("readLockCheckInterval");
+ if (checkInterval != null) {
+ readLockStrategy.setCheckInterval(checkInterval);
+ }
return readLockStrategy;
} else if ("changed".equals(readLock)) {
GenericFileExclusiveReadLockStrategy readLockStrategy = new SftpChangedExclusiveReadLockStrategy();