You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/13 09:43:31 UTC
svn commit: r744032 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/component/file/strategy/
camel-core/src/main/java/org/apache/camel/model/
components/camel-ftp/src/main/java/org/...
Author: davsclaus
Date: Fri Feb 13 08:43:31 2009
New Revision: 744032
URL: http://svn.apache.org/viewvc?rev=744032&view=rev
Log:
CAMEL-1295: Itroduced localWorkDirectory for the ftp component
Added:
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java (contents, props changed)
- copied, changed from r743796, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java Fri Feb 13 08:43:31 2009
@@ -38,6 +38,12 @@
if (GenericFile.class.isAssignableFrom(value.getClass())) {
GenericFile file = (GenericFile) value;
Class from = file.getBody().getClass();
+
+ // maybe from is already the type we want
+ if (from.isAssignableFrom(type)) {
+ return file.getBody();
+ }
+ // no then try to lookup a type converter
TypeConverter tc = registry.lookup(type, from);
if (tc != null) {
Object body = file.getBody();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Fri Feb 13 08:43:31 2009
@@ -51,6 +51,7 @@
protected GenericFileConfiguration configuration;
protected boolean directory = true;
+ protected String localWorkDirectory;
protected boolean autoCreate = true;
protected int bufferSize = 128 * 1024;
protected boolean append = true;
@@ -402,6 +403,14 @@
this.processStrategy = processStrategy;
}
+ public String getLocalWorkDirectory() {
+ return localWorkDirectory;
+ }
+
+ public void setLocalWorkDirectory(String localWorkDirectory) {
+ this.localWorkDirectory = localWorkDirectory;
+ }
+
/**
* Configures the given message with the file which sets the body to the
* file object.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java Fri Feb 13 08:43:31 2009
@@ -27,6 +27,7 @@
import java.util.List;
import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
@@ -118,18 +119,22 @@
File file = new File(fileName);
try {
- boolean fileSource = exchange.getIn().getBody() instanceof File;
- if (fileSource) {
- File source = ExchangeHelper.getMandatoryInBody(exchange, File.class);
+ File source = null;
+ try {
+ source = exchange.getIn().getBody(File.class);
+ } catch (NoTypeConversionAvailableException e) {
+ // ignore
+ }
+ if (source != null && source.exists()) {
writeFileByFile(source, file);
} else {
InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
writeFileByStream(in, file);
}
} catch (IOException e) {
- throw new GenericFileOperationFailedException("Can not store file: " + file, e);
+ throw new GenericFileOperationFailedException("Cannot store file: " + file, e);
} catch (InvalidPayloadException e) {
- throw new GenericFileOperationFailedException("Can not store file: " + file, e);
+ throw new GenericFileOperationFailedException("Cannot store file: " + file, e);
}
return true;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Fri Feb 13 08:43:31 2009
@@ -16,14 +16,19 @@
*/
package org.apache.camel.component.file.strategy;
+import java.io.File;
+
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileExchange;
import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public abstract class GenericFileProcessStrategySupport<T> implements GenericFileProcessStrategy<T> {
+ protected final transient Log log = LogFactory.getLog(getClass());
private GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, GenericFileExchange<T> exchange, GenericFile<T> file) throws Exception {
@@ -43,12 +48,16 @@
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
+
+ deleteLocalWorkFile(exchange);
}
public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, GenericFileExchange<T> exchange, GenericFile<T> file) throws Exception {
if (exclusiveReadLockStrategy != null) {
exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
}
+
+ deleteLocalWorkFile(exchange);
}
public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {
@@ -58,5 +67,17 @@
public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy) {
this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
}
+
+ private void deleteLocalWorkFile(GenericFileExchange<T> exchange) {
+ // delete local work file, if it was used (eg by ftp component)
+ String path = exchange.getIn().getHeader("CamelFileLocalWorkPath", String.class);
+ if (path != null) {
+ File local = new File(path);
+ if (log.isTraceEnabled()) {
+ log.trace("Deleting lock work file: " + local);
+ }
+ local.delete();
+ }
+ }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Fri Feb 13 08:43:31 2009
@@ -23,11 +23,8 @@
import org.apache.camel.component.file.GenericFileExchange;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
- private static final transient Log LOG = LogFactory.getLog(org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.class);
private GenericFileRenamer<T> beginRenamer;
private GenericFileRenamer<T> commitRenamer;
@@ -77,8 +74,8 @@
throw new GenericFileOperationFailedException("Cannot create directory: " + to.getParent() + " (could be because of denied permissions)");
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming file: " + from + " to: " + to);
+ if (log.isDebugEnabled()) {
+ log.debug("Renaming file: " + from + " to: " + to);
}
boolean renamed = operations.renameFile(from.getAbsoluteFileName(), to.getAbsoluteFileName());
if (!renamed) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java Fri Feb 13 08:43:31 2009
@@ -152,7 +152,7 @@
/**
* Uses fail over load balancer
*
- * @retrun the builder
+ * @return the builder
*/
public LoadBalanceType failOver() {
loadBalancerType = new LoadBalancerType(new FailOverLoadBalancer());
@@ -162,8 +162,8 @@
/**
* Uses fail over load balancer
*
- * @param the exception Class which we want to catch
- * @retrun the builder
+ * @param throwable exception Class which we want to catch
+ * @return the builder
*/
public LoadBalanceType failOver(Class throwable) {
loadBalancerType = new LoadBalancerType(new FailOverLoadBalancer(throwable));
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java Fri Feb 13 08:43:31 2009
@@ -50,7 +50,10 @@
FtpConfiguration config = new FtpConfiguration(new URI(baseUri));
FtpOperations operations = new FtpOperations();
- return new FtpEndpoint(uri, this, operations, config);
+ FtpEndpoint result = new FtpEndpoint(uri, this, operations, config);
+ operations.setEndpoint(result);
+
+ return result;
}
protected void afterPropertiesSet(GenericFileEndpoint<FTPFile> endpoint) throws Exception {
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Fri Feb 13 08:43:31 2009
@@ -89,17 +89,19 @@
}
}
+ @SuppressWarnings("unchecked")
private RemoteFile<FTPFile> asRemoteFile(String directory, FTPFile file) {
- RemoteFile<FTPFile> remote = new RemoteFile<FTPFile>();
- remote.setFile(file);
- remote.setFileName(file.getName());
- remote.setFileLength(file.getSize());
+ RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
+
+ answer.setFile(file);
+ answer.setFileName(file.getName());
+ answer.setFileLength(file.getSize());
if (file.getTimestamp() != null) {
- remote.setLastModified(file.getTimestamp().getTimeInMillis());
+ answer.setLastModified(file.getTimestamp().getTimeInMillis());
}
- remote.setHostname(((RemoteFileConfiguration) endpoint.getConfiguration()).getHost());
+ answer.setHostname(((RemoteFileConfiguration) endpoint.getConfiguration()).getHost());
String absoluteFileName = (ObjectHelper.isNotEmpty(directory) ? directory + "/" : "") + file.getName();
- remote.setAbsoluteFileName(absoluteFileName);
+ answer.setAbsoluteFileName(absoluteFileName);
// the relative filename
String ftpBasePath = endpoint.getConfiguration().getFile();
@@ -107,9 +109,9 @@
if (relativePath.startsWith("/")) {
relativePath = relativePath.substring(1);
}
- remote.setRelativeFileName(relativePath);
+ answer.setRelativeFileName(relativePath);
- return remote;
+ return answer;
}
}
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java Fri Feb 13 08:43:31 2009
@@ -17,6 +17,8 @@
package org.apache.camel.component.file.remote;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -40,6 +42,7 @@
public class FtpOperations implements RemoteFileOperations<FTPFile> {
private static final transient Log LOG = LogFactory.getLog(FtpOperations.class);
private final FTPClient client;
+ private GenericFileEndpoint endpoint;
public FtpOperations() {
this.client = new FTPClient();
@@ -50,7 +53,7 @@
}
public void setEndpoint(GenericFileEndpoint endpoint) {
- // noop
+ this.endpoint = endpoint;
}
public boolean connect(RemoteFileConfiguration config) throws GenericFileOperationFailedException {
@@ -167,21 +170,98 @@
}
public boolean retrieveFile(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+ if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
+ // local work directory is configured so we should store file content as files in this local directory
+ return retrieveFileToFileInLocalWorkDirectory(name, exchange);
+ } else {
+ // store file content directory as stream on the body
+ return retrieveFileToStreamInBody(name, exchange);
+ }
+ }
+
+ private boolean retrieveFileToStreamInBody(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+ OutputStream os = null;
try {
+ os = new ByteArrayOutputStream();
GenericFile<FTPFile> target = exchange.getGenericFile();
- OutputStream os = new ByteArrayOutputStream();
target.setBody(os);
return client.retrieveFile(name, os);
} catch (IOException e) {
throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+ } finally {
+ ObjectHelper.close(os, "retrieve: " + name, LOG);
+ }
+ }
+
+ private boolean retrieveFileToFileInLocalWorkDirectory(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+ File temp;
+ File local = new File(endpoint.getLocalWorkDirectory());
+ OutputStream os;
+ try {
+ // use relative filename in local work directory
+ String relativeName = exchange.getGenericFile().getRelativeFileName();
+
+ temp = new File(local, relativeName + ".inprogress");
+ local = new File(local, relativeName);
+
+ // create directory to local work file
+ local.mkdirs();
+
+ // delete any existing files
+ if (temp.exists()) {
+ if (!temp.delete()) {
+ throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + temp);
+ }
+ }
+ if (local.exists()) {
+ if (!local.delete()) {
+ throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + local);
+ }
+ }
+
+ // create new temp local work file
+ if (!temp.createNewFile()) {
+ throw new RemoteFileOperationFailedException("Cannot create new local work file: " + temp);
+ }
+
+ // store content as a file in the local work directory in the temp handle
+ os = new FileOutputStream(temp);
+
+ // set header with the path to the local work file
+ exchange.getIn().setHeader("CamelFileLocalWorkPath", local.getPath());
+
+ } catch (Exception e) {
+ throw new RemoteFileOperationFailedException("Cannot create new local work file: " + local);
+ }
+
+ boolean result;
+ try {
+ GenericFile<FTPFile> target = exchange.getGenericFile();
+ // store the java.io.File handle as the body
+ target.setBody(local);
+ result = client.retrieveFile(name, os);
+
+ // rename temp to local after we have retrieved the data
+ if (!temp.renameTo(local)) {
+ throw new RemoteFileOperationFailedException("Cannot rename local work file from: " + temp + " to: " + local);
+ }
+ } catch (IOException e) {
+ throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+ } finally {
+ ObjectHelper.close(os, "retrieve: " + name, LOG);
}
+
+ return result;
}
public boolean storeFile(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+ InputStream is = exchange.getIn().getBody(InputStream.class);
try {
- return client.storeFile(name, exchange.getIn().getBody(InputStream.class));
+ return client.storeFile(name, is);
} catch (IOException e) {
throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+ } finally {
+ ObjectHelper.close(is, "store: " + name, LOG);
}
}
@@ -221,12 +301,20 @@
}
}
+ public boolean deleteFile(String name) throws GenericFileOperationFailedException {
+ try {
+ return this.client.deleteFile(name);
+ } catch (IOException e) {
+ throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+ }
+ }
+
private boolean buildDirectoryChunks(String dirName) throws IOException {
final StringBuilder sb = new StringBuilder(dirName.length());
final String[] dirs = dirName.split("\\/|\\\\");
boolean success = false;
- for (String dir : dirs) {
+ for (String dir : dirs) {
sb.append(dir).append('/');
String directory = sb.toString();
@@ -243,21 +331,4 @@
return success;
}
- public FTPClient changeCurrentDirectory(FTPClient client, String path) throws GenericFileOperationFailedException {
- try {
- client.changeWorkingDirectory(path);
- return client;
- } catch (IOException e) {
- throw new RemoteFileOperationFailedException("Failed to delete [" + path + "]", e);
- }
- }
-
- public boolean deleteFile(String name) throws GenericFileOperationFailedException {
- try {
- return this.client.deleteFile(name);
- } catch (IOException e) {
- throw new RemoteFileOperationFailedException("Failed to delete [" + name + "]", e);
- }
- }
-
}
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java Fri Feb 13 08:43:31 2009
@@ -50,7 +50,9 @@
SftpConfiguration config = new SftpConfiguration(new URI(baseUri));
SftpOperations operations = new SftpOperations();
- return new SftpEndpoint(uri, this, operations, config);
+ SftpEndpoint result = new SftpEndpoint(uri, this, operations, config);
+ operations.setEndpoint(result);
+ return result;
}
protected void afterPropertiesSet(GenericFileEndpoint<ChannelSftp.LsEntry> endpoint) throws Exception {
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java Fri Feb 13 08:43:31 2009
@@ -17,6 +17,8 @@
package org.apache.camel.component.file.remote;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -46,11 +48,12 @@
*/
public class SftpOperations implements RemoteFileOperations<ChannelSftp.LsEntry> {
private static final transient Log LOG = LogFactory.getLog(SftpOperations.class);
+ private GenericFileEndpoint endpoint;
private ChannelSftp channel;
private Session session;
public void setEndpoint(GenericFileEndpoint endpoint) {
- // noop
+ this.endpoint = endpoint;
}
public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException {
@@ -194,9 +197,9 @@
}
}
} catch (IOException e) {
- throw new RemoteFileOperationFailedException("Cannot build directory " + directory, e);
+ throw new RemoteFileOperationFailedException("Cannot build directory: " + directory, e);
} catch (SftpException e) {
- throw new RemoteFileOperationFailedException("Cannot build directory " + directory, e);
+ throw new RemoteFileOperationFailedException("Cannot build directory: " + directory, e);
} finally {
// change back to original directory
if (originalDirectory != null) {
@@ -272,6 +275,16 @@
}
public boolean retrieveFile(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
+ if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
+ // local work directory is configured so we should store file content as files in this local directory
+ return retrieveFileToFileInLocalWorkDirectory(name, exchange);
+ } else {
+ // store file content directory as stream on the body
+ return retrieveFileToStreamInBody(name, exchange);
+ }
+ }
+
+ private boolean retrieveFileToStreamInBody(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
try {
GenericFile<ChannelSftp.LsEntry> target = exchange.getGenericFile();
OutputStream os = new ByteArrayOutputStream();
@@ -279,19 +292,79 @@
channel.get(name, os);
return true;
} catch (SftpException e) {
- throw new RemoteFileOperationFailedException("Could not retrieve the file [" + name + "]", e);
+ throw new RemoteFileOperationFailedException("Cannot retrieve file: " + name, e);
}
}
+ private boolean retrieveFileToFileInLocalWorkDirectory(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
+ File temp;
+ File local = new File(endpoint.getLocalWorkDirectory());
+ OutputStream os;
+ try {
+ // use relative filename in local work directory
+ String relativeName = exchange.getGenericFile().getRelativeFileName();
+
+ temp = new File(local, relativeName + ".inprogress");
+ local = new File(local, relativeName);
+
+ // create directory to local work file
+ local.mkdirs();
+
+ // delete any existing files
+ if (temp.exists()) {
+ if (!temp.delete()) {
+ throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + temp);
+ }
+ }
+ if (local.exists()) {
+ if (!local.delete()) {
+ throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + local);
+ }
+ }
+
+ // create new temp local work file
+ if (!temp.createNewFile()) {
+ throw new RemoteFileOperationFailedException("Cannot create new local work file: " + temp);
+ }
+
+ // store content as a file in the local work directory in the temp handle
+ os = new FileOutputStream(temp);
+
+ // set header with the path to the local work file
+ exchange.getIn().setHeader("CamelFileLocalWorkPath", local.getPath());
+
+ } catch (Exception e) {
+ throw new RemoteFileOperationFailedException("Cannot create new local work file: " + local);
+ }
+
+ try {
+ GenericFile<ChannelSftp.LsEntry> target = exchange.getGenericFile();
+ // store the java.io.File handle as the body
+ target.setBody(local);
+ channel.get(name, os);
+
+ // rename temp to local after we have retrieved the data
+ if (!temp.renameTo(local)) {
+ throw new RemoteFileOperationFailedException("Cannot rename local work file from: " + temp + " to: " + local);
+ }
+ } catch (SftpException e) {
+ throw new RemoteFileOperationFailedException("Cannot retrieve file: " + name, e);
+ } finally {
+ ObjectHelper.close(os, "retrieve: " + name, LOG);
+ }
+
+ return true;
+ }
+
public boolean storeFile(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
try {
InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
channel.put(in, name);
return true;
} catch (SftpException e) {
- throw new RemoteFileOperationFailedException("Could not write the file [" + name + "]", e);
+ throw new RemoteFileOperationFailedException("Cannot store file: " + name, e);
} catch (InvalidPayloadException e) {
- throw new RemoteFileOperationFailedException("Could not write the file [" + name + "]", e);
+ throw new RemoteFileOperationFailedException("Cannot store file: " + name, e);
}
}
Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java Fri Feb 13 08:43:31 2009
@@ -26,7 +26,7 @@
private String getFtpUrl() {
return "ftp://admin@localhost:" + getPort()
- + "/idempotent?password=admin&binary=false&idempotent=true&delete=true";
+ + "/idempotent?password=admin&binary=false&idempotent=true&delete=true&delay=1000";
}
@Override
Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java (from r743796, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java&r1=743796&r2=744032&rev=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java Fri Feb 13 08:43:31 2009
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.file.remote;
+import java.io.File;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -23,18 +25,20 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.NewFileComponent;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.IOConverter;
/**
* @version $Revision$
*/
-public class FtpConsumerBodyAsStringTest extends FtpServerTestSupport {
+public class FtpConsumerLocalWorkDirectoryTest extends FtpServerTestSupport {
private String getFtpUrl() {
- return "ftp://admin@localhost:" + getPort() + "/tmp4/camel?password=admin&consumer.delay=5000";
+ return "ftp://admin@localhost:" + getPort() + "/lwd/?password=admin&delay=5000&localWorkDirectory=target/lwd";
}
@Override
protected void setUp() throws Exception {
+ deleteDirectory("target/lwd");
super.setUp();
prepareFtpServer();
}
@@ -52,12 +56,23 @@
producer.stop();
}
- public void testSingleFileTest() throws Exception {
+ public void testLocalWorkDirectory() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
mock.expectedMessageCount(1);
assertMockEndpointsSatisfied();
+
+ Thread.sleep(200);
+
+ // now the lwd file should be deleted
+ File local = new File("target/lwd/hello.txt").getAbsoluteFile();
+ assertFalse("Local work file should have been deleted", local.exists());
+
+ // and the out file should exists
+ File out = new File("target/out/hello.txt").getAbsoluteFile();
+ assertTrue("file should exists", out.exists());
+ assertEquals("Hello World", IOConverter.toString(out));
}
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -65,11 +80,12 @@
public void configure() throws Exception {
from(getFtpUrl()).process(new Processor() {
public void process(Exchange exchange) throws Exception {
- String body = exchange.getIn().getBody(String.class);
+ File body = exchange.getIn().getBody(File.class);
assertNotNull(body);
- assertEquals("Hello World", body);
+ assertTrue("Local work file should exists", body.exists());
+ assertEquals("target/lwd/hello.txt", body.getPath());
}
- }).to("mock:result");
+ }).to("file://target/out", "mock:result");
}
};
}
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date