You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/17 09:38:29 UTC
svn commit: r1422793 - in /camel/branches/camel-2.10.x: ./
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/test/java/org/apache/camel/component/file/
components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/
Author: davsclaus
Date: Mon Dec 17 08:38:27 2012
New Revision: 1422793
URL: http://svn.apache.org/viewvc?rev=1422793&view=rev
Log:
CAMEL-5848: file/ftp consumers - When using doneFileName then avoid picking up files in middle of group if done file is written during scanning
Added:
camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java
- copied unchanged from r1422792, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeDoneFileIssueTest.java
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1422792
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java Mon Dec 17 08:38:27 2012
@@ -17,6 +17,7 @@
package org.apache.camel.component.file;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import org.apache.camel.Processor;
@@ -51,8 +52,8 @@ public class FileConsumer extends Generi
}
log.trace("Polling directory: {}", directory.getPath());
- File[] files = directory.listFiles();
- if (files == null || files.length == 0) {
+ File[] dirFiles = directory.listFiles();
+ if (dirFiles == null || dirFiles.length == 0) {
// no files in this directory to poll
if (log.isTraceEnabled()) {
log.trace("No files found in directory: {}", directory.getPath());
@@ -61,9 +62,10 @@ public class FileConsumer extends Generi
} else {
// we found some files
if (log.isTraceEnabled()) {
- log.trace("Found {} in directory: {}", files.length, directory.getPath());
+ log.trace("Found {} in directory: {}", dirFiles.length, directory.getPath());
}
}
+ List<File> files = Arrays.asList(dirFiles);
for (File file : files) {
// check if we can continue polling in files
@@ -81,7 +83,7 @@ public class FileConsumer extends Generi
GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset());
if (file.isDirectory()) {
- if (endpoint.isRecursive() && isValidFile(gf, true) && depth < endpoint.getMaxDepth()) {
+ if (endpoint.isRecursive() && isValidFile(gf, true, files) && depth < endpoint.getMaxDepth()) {
// recursive scan and add the sub files and folders
String subDirectory = fileName + File.separator + file.getName();
boolean canPollMore = pollDirectory(subDirectory, fileList, depth);
@@ -91,7 +93,7 @@ public class FileConsumer extends Generi
}
} else {
// Windows can report false to a file on a share so regard it always as a file (if its not a directory)
- if (isValidFile(gf, false) && depth >= endpoint.minDepth) {
+ if (isValidFile(gf, false, files) && depth >= endpoint.minDepth) {
if (isInProgress(gf)) {
if (log.isTraceEnabled()) {
log.trace("Skipping as file is already in progress: {}", gf.getFileName());
@@ -109,6 +111,19 @@ public class FileConsumer extends Generi
return true;
}
+ @Override
+ protected boolean isMatched(GenericFile<File> file, String doneFileName, List<File> files) {
+ String onlyName = FileUtil.stripPath(doneFileName);
+ // the done file name must be among the files
+ for (File f : files) {
+ if (f.getName().equals(onlyName)) {
+ return true;
+ }
+ }
+ log.trace("Done file: {} does not exist", doneFileName);
+ return false;
+ }
+
/**
* Creates a new GenericFile<File> based on the given file.
*
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Mon Dec 17 08:38:27 2012
@@ -389,10 +389,11 @@ public abstract class GenericFileConsume
*
* @param file the file
* @param isDirectory whether the file is a directory or a file
+ * @param files files in the directory
* @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
*/
- protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
- if (!isMatched(file, isDirectory)) {
+ protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
+ if (!isMatched(file, isDirectory, files)) {
log.trace("File did not match. Will skip this file: {}", file);
return false;
} else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
@@ -416,9 +417,10 @@ public abstract class GenericFileConsume
*
* @param file the file
* @param isDirectory whether the file is a directory or a file
+ * @param files files in the directory
* @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
*/
- protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
+ protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
String name = file.getFileNameOnly();
// folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
@@ -482,7 +484,7 @@ public abstract class GenericFileConsume
return false;
}
- if (!isMatched(file, doneFileName)) {
+ if (!isMatched(file, doneFileName, files)) {
return false;
}
}
@@ -494,19 +496,11 @@ public abstract class GenericFileConsume
* Strategy to perform file matching based on endpoint configuration in terms of done file name.
*
* @param file the file
- * @param doneFileName the done file name
+ * @param doneFileName the done file name (without any paths)
+ * @param files files in the directory
* @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
*/
- protected boolean isMatched(GenericFile<T> file, String doneFileName) {
- // the file is only valid if the done file exist
- if (!operations.existsFile(doneFileName)) {
- log.trace("Done file: {} does not exist", doneFileName);
- return false;
- }
-
- // assume matched
- return true;
- }
+ protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
/**
* Is the given file already in progress.
Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Mon Dec 17 08:38:27 2012
@@ -794,7 +794,6 @@ public abstract class GenericFileEndpoin
if (ObjectHelper.isNotEmpty(path) && ObjectHelper.isNotEmpty(pattern)) {
// done file must always be in same directory as the real file name
answer = path + getFileSeparator() + pattern;
- answer = path + File.separator + pattern;
}
if (getConfiguration().needToNormalize()) {
Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Mon Dec 17 08:38:27 2012
@@ -108,7 +108,7 @@ public class FtpConsumer extends RemoteF
if (file.isDirectory()) {
RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
- if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) {
+ if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) {
// recursive scan and add the sub files and folders
String subDirectory = file.getName();
String path = absolutePath + "/" + subDirectory;
@@ -119,7 +119,7 @@ public class FtpConsumer extends RemoteF
}
} else if (file.isFile()) {
RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
- if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) {
+ if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) {
if (isInProgress(remote)) {
log.trace("Skipping as file is already in progress: {}", remote.getFileName());
} else {
@@ -135,6 +135,20 @@ public class FtpConsumer extends RemoteF
return true;
}
+ @Override
+ protected boolean isMatched(GenericFile<FTPFile> file, String doneFileName, List<FTPFile> files) {
+ String onlyName = FileUtil.stripPath(doneFileName);
+
+ for (FTPFile f : files) {
+ if (f.getName().equals(onlyName)) {
+ return true;
+ }
+ }
+
+ log.trace("Done file: {} does not exist", doneFileName);
+ return false;
+ }
+
private RemoteFile<FTPFile> asRemoteFile(String absolutePath, FTPFile file) {
RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Mon Dec 17 08:38:27 2012
@@ -20,10 +20,8 @@ import java.io.IOException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileConsumer;
import org.apache.camel.component.file.GenericFileOperationFailedException;
-import org.apache.camel.util.FileUtil;
/**
* Base class for remote file consumers.
@@ -155,14 +153,4 @@ public abstract class RemoteFileConsumer
return ((RemoteFileEndpoint<?>) endpoint).remoteServerInformation();
}
- @Override
- protected boolean isMatched(GenericFile<T> file, String doneFileName) {
- // ftp specific as we need to cater for stepwise
- if (getEndpoint().getConfiguration().isStepwise()) {
- // stepwise enabled, so done file should always be without path
- doneFileName = FileUtil.stripPath(doneFileName);
- }
-
- return super.isMatched(file, doneFileName);
- }
}
Modified: camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=1422793&r1=1422792&r2=1422793&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Mon Dec 17 08:38:27 2012
@@ -105,7 +105,7 @@ public class SftpConsumer extends Remote
if (file.getAttrs().isDir()) {
RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file);
- if (endpoint.isRecursive() && isValidFile(remote, true) && depth < endpoint.getMaxDepth()) {
+ if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) {
// recursive scan and add the sub files and folders
String subDirectory = file.getFilename();
String path = absolutePath + "/" + subDirectory;
@@ -118,7 +118,7 @@ public class SftpConsumer extends Remote
// just assuming its a file we should poll
} else {
RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file);
- if (isValidFile(remote, false) && depth >= endpoint.getMinDepth()) {
+ if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) {
if (isInProgress(remote)) {
if (log.isTraceEnabled()) {
log.trace("Skipping as file is already in progress: {}", remote.getFileName());
@@ -134,6 +134,20 @@ public class SftpConsumer extends Remote
return true;
}
+ @Override
+ protected boolean isMatched(GenericFile<ChannelSftp.LsEntry> file, String doneFileName, List<ChannelSftp.LsEntry> files) {
+ String onlyName = FileUtil.stripPath(doneFileName);
+
+ for (ChannelSftp.LsEntry f : files) {
+ if (f.getFilename().equals(onlyName)) {
+ return true;
+ }
+ }
+
+ log.trace("Done file: {} does not exist", doneFileName);
+ return false;
+ }
+
private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(String absolutePath, ChannelSftp.LsEntry file) {
RemoteFile<ChannelSftp.LsEntry> answer = new RemoteFile<ChannelSftp.LsEntry>();