You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/02/13 09:43:31 UTC

svn commit: r744032 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/file/strategy/ camel-core/src/main/java/org/apache/camel/model/ components/camel-ftp/src/main/java/org/...

Author: davsclaus
Date: Fri Feb 13 08:43:31 2009
New Revision: 744032

URL: http://svn.apache.org/viewvc?rev=744032&view=rev
Log:
CAMEL-1295: Itroduced localWorkDirectory for the ftp component

Added:
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java   (contents, props changed)
      - copied, changed from r743796, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java Fri Feb 13 08:43:31 2009
@@ -38,6 +38,12 @@
         if (GenericFile.class.isAssignableFrom(value.getClass())) {
             GenericFile file = (GenericFile) value;
             Class from = file.getBody().getClass();
+
+            // maybe from is already the type we want
+            if (from.isAssignableFrom(type)) {
+                return file.getBody();
+            }
+            // no then try to lookup a type converter
             TypeConverter tc = registry.lookup(type, from);
             if (tc != null) {
                 Object body = file.getBody();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java Fri Feb 13 08:43:31 2009
@@ -51,6 +51,7 @@
     protected GenericFileConfiguration configuration;
 
     protected boolean directory = true;
+    protected String localWorkDirectory;
     protected boolean autoCreate = true;
     protected int bufferSize = 128 * 1024;
     protected boolean append = true;
@@ -402,6 +403,14 @@
         this.processStrategy = processStrategy;
     }
 
+    public String getLocalWorkDirectory() {
+        return localWorkDirectory;
+    }
+
+    public void setLocalWorkDirectory(String localWorkDirectory) {
+        this.localWorkDirectory = localWorkDirectory;
+    }
+
     /**
      * Configures the given message with the file which sets the body to the
      * file object.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/NewFileOperations.java Fri Feb 13 08:43:31 2009
@@ -27,6 +27,7 @@
 import java.util.List;
 
 import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
@@ -118,18 +119,22 @@
         
         File file = new File(fileName);
         try {
-            boolean fileSource = exchange.getIn().getBody() instanceof File;
-            if (fileSource) {
-                File source = ExchangeHelper.getMandatoryInBody(exchange, File.class);
+            File source = null;
+            try {
+                source = exchange.getIn().getBody(File.class);
+            } catch (NoTypeConversionAvailableException e) {
+                // ignore
+            }
+            if (source != null && source.exists()) {
                 writeFileByFile(source, file);
             } else {
                 InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
                 writeFileByStream(in, file);
             }
         } catch (IOException e) {            
-            throw new GenericFileOperationFailedException("Can not store file: " + file, e);
+            throw new GenericFileOperationFailedException("Cannot store file: " + file, e);
         } catch (InvalidPayloadException e) {
-            throw new GenericFileOperationFailedException("Can not store file: " + file, e);
+            throw new GenericFileOperationFailedException("Cannot store file: " + file, e);
         }
 
         return true;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java Fri Feb 13 08:43:31 2009
@@ -16,14 +16,19 @@
  */
 package org.apache.camel.component.file.strategy;
 
+import java.io.File;
+
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExchange;
 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.component.file.GenericFileProcessStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public abstract class GenericFileProcessStrategySupport<T> implements GenericFileProcessStrategy<T> {
+    protected final transient Log log = LogFactory.getLog(getClass());
     private GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
 
     public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, GenericFileExchange<T> exchange, GenericFile<T> file) throws Exception {
@@ -43,12 +48,16 @@
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
         }
+
+        deleteLocalWorkFile(exchange);
     }
 
     public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, GenericFileExchange<T> exchange, GenericFile<T> file) throws Exception {
         if (exclusiveReadLockStrategy != null) {
             exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange);
         }
+
+        deleteLocalWorkFile(exchange);
     }
 
     public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {
@@ -58,5 +67,17 @@
     public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy) {
         this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
     }
+
+    private void deleteLocalWorkFile(GenericFileExchange<T> exchange) {
+        // delete local work file, if it was used (eg by ftp component)
+        String path = exchange.getIn().getHeader("CamelFileLocalWorkPath", String.class);
+        if (path != null) {
+            File local = new File(path);
+            if (log.isTraceEnabled()) {
+                log.trace("Deleting lock work file: " + local);
+            }
+            local.delete();
+        }
+    }
 }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java Fri Feb 13 08:43:31 2009
@@ -23,11 +23,8 @@
 import org.apache.camel.component.file.GenericFileExchange;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileOperations;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrategySupport<T> {
-    private static final transient Log LOG = LogFactory.getLog(org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.class);
     private GenericFileRenamer<T> beginRenamer;
     private GenericFileRenamer<T> commitRenamer;
 
@@ -77,8 +74,8 @@
             throw new GenericFileOperationFailedException("Cannot create directory: " + to.getParent() + " (could be because of denied permissions)");
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Renaming file: " + from + " to: " + to);
+        if (log.isDebugEnabled()) {
+            log.debug("Renaming file: " + from + " to: " + to);
         }
         boolean renamed = operations.renameFile(from.getAbsoluteFileName(), to.getAbsoluteFileName());
         if (!renamed) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceType.java Fri Feb 13 08:43:31 2009
@@ -152,7 +152,7 @@
     /**
      * Uses fail over load balancer
      * 
-     * @retrun the builder
+     * @return the builder
      */
     public LoadBalanceType failOver() {
         loadBalancerType = new LoadBalancerType(new FailOverLoadBalancer());
@@ -162,8 +162,8 @@
     /**
      * Uses fail over load balancer
      * 
-     * @param the exception Class which we want to catch
-     * @retrun the builder
+     * @param throwable exception Class which we want to catch
+     * @return the builder
      */
     public LoadBalanceType failOver(Class throwable) {
         loadBalancerType = new LoadBalancerType(new FailOverLoadBalancer(throwable));

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpComponent.java Fri Feb 13 08:43:31 2009
@@ -50,7 +50,10 @@
         FtpConfiguration config = new FtpConfiguration(new URI(baseUri));
 
         FtpOperations operations = new FtpOperations();
-        return new FtpEndpoint(uri, this, operations, config);
+        FtpEndpoint result = new FtpEndpoint(uri, this, operations, config);
+        operations.setEndpoint(result);
+
+        return result;
     }
 
     protected void afterPropertiesSet(GenericFileEndpoint<FTPFile> endpoint) throws Exception {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java Fri Feb 13 08:43:31 2009
@@ -89,17 +89,19 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
     private RemoteFile<FTPFile> asRemoteFile(String directory, FTPFile file) {
-        RemoteFile<FTPFile> remote = new RemoteFile<FTPFile>();
-        remote.setFile(file);
-        remote.setFileName(file.getName());
-        remote.setFileLength(file.getSize());
+        RemoteFile<FTPFile> answer = new RemoteFile<FTPFile>();
+
+        answer.setFile(file);
+        answer.setFileName(file.getName());
+        answer.setFileLength(file.getSize());
         if (file.getTimestamp() != null) {
-            remote.setLastModified(file.getTimestamp().getTimeInMillis());
+            answer.setLastModified(file.getTimestamp().getTimeInMillis());
         }
-        remote.setHostname(((RemoteFileConfiguration) endpoint.getConfiguration()).getHost());
+        answer.setHostname(((RemoteFileConfiguration) endpoint.getConfiguration()).getHost());
         String absoluteFileName = (ObjectHelper.isNotEmpty(directory) ? directory + "/" : "") + file.getName();
-        remote.setAbsoluteFileName(absoluteFileName);
+        answer.setAbsoluteFileName(absoluteFileName);
 
         // the relative filename
         String ftpBasePath = endpoint.getConfiguration().getFile();
@@ -107,9 +109,9 @@
         if (relativePath.startsWith("/")) {
             relativePath = relativePath.substring(1);
         }
-        remote.setRelativeFileName(relativePath);
+        answer.setRelativeFileName(relativePath);
 
-        return remote;
+        return answer;
     }
 
 }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java Fri Feb 13 08:43:31 2009
@@ -17,6 +17,8 @@
 package org.apache.camel.component.file.remote;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,6 +42,7 @@
 public class FtpOperations implements RemoteFileOperations<FTPFile> {
     private static final transient Log LOG = LogFactory.getLog(FtpOperations.class);
     private final FTPClient client;
+    private GenericFileEndpoint endpoint;
 
     public FtpOperations() {
         this.client = new FTPClient();
@@ -50,7 +53,7 @@
     }
 
     public void setEndpoint(GenericFileEndpoint endpoint) {
-        // noop
+        this.endpoint = endpoint;
     }
 
     public boolean connect(RemoteFileConfiguration config) throws GenericFileOperationFailedException {
@@ -167,21 +170,98 @@
     }
 
     public boolean retrieveFile(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+        if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
+            // local work directory is configured so we should store file content as files in this local directory
+            return retrieveFileToFileInLocalWorkDirectory(name, exchange);
+        } else {
+            // store file content directory as stream on the body
+            return retrieveFileToStreamInBody(name, exchange);
+        }
+    }
+
+    private boolean retrieveFileToStreamInBody(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+        OutputStream os = null;
         try {
+            os = new ByteArrayOutputStream();
             GenericFile<FTPFile> target = exchange.getGenericFile();
-            OutputStream os = new ByteArrayOutputStream();
             target.setBody(os);
             return client.retrieveFile(name, os);
         } catch (IOException e) {
             throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+        } finally {
+            ObjectHelper.close(os, "retrieve: " + name, LOG);
+        }
+    }
+
+    private boolean retrieveFileToFileInLocalWorkDirectory(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+        File temp;
+        File local = new File(endpoint.getLocalWorkDirectory());
+        OutputStream os;
+        try {
+            // use relative filename in local work directory
+            String relativeName = exchange.getGenericFile().getRelativeFileName();
+            
+            temp = new File(local, relativeName + ".inprogress");
+            local = new File(local, relativeName);
+
+            // create directory to local work file
+            local.mkdirs();
+
+            // delete any existing files
+            if (temp.exists()) {
+                if (!temp.delete()) {
+                    throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + temp);
+                }
+            }
+            if (local.exists()) {
+                if (!local.delete()) {
+                    throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + local);
+                }
+            }
+
+            // create new temp local work file
+            if (!temp.createNewFile()) {
+                throw new RemoteFileOperationFailedException("Cannot create new local work file: " + temp);
+            }
+
+            // store content as a file in the local work directory in the temp handle
+            os = new FileOutputStream(temp);
+
+            // set header with the path to the local work file
+            exchange.getIn().setHeader("CamelFileLocalWorkPath", local.getPath());
+
+        } catch (Exception e) {
+            throw new RemoteFileOperationFailedException("Cannot create new local work file: " + local);
+        }
+
+        boolean result;
+        try {
+            GenericFile<FTPFile> target = exchange.getGenericFile();
+            // store the java.io.File handle as the body
+            target.setBody(local);
+            result = client.retrieveFile(name, os);
+
+            // rename temp to local after we have retrieved the data
+            if (!temp.renameTo(local)) {
+                throw new RemoteFileOperationFailedException("Cannot rename local work file from: " + temp + " to: " + local);
+            }
+        } catch (IOException e) {
+            throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+        } finally {
+            ObjectHelper.close(os, "retrieve: " + name, LOG);
         }
+
+        return result;
     }
 
     public boolean storeFile(String name, GenericFileExchange<FTPFile> exchange) throws GenericFileOperationFailedException {
+        InputStream is = exchange.getIn().getBody(InputStream.class);
         try {
-            return client.storeFile(name, exchange.getIn().getBody(InputStream.class));
+            return client.storeFile(name, is);
         } catch (IOException e) {
             throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+        } finally {
+            ObjectHelper.close(is, "store: " + name, LOG);
         }
     }
 
@@ -221,12 +301,20 @@
         }
     }
 
+    public boolean deleteFile(String name) throws GenericFileOperationFailedException {
+        try {
+            return this.client.deleteFile(name);
+        } catch (IOException e) {
+            throw new RemoteFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
+        }
+    }
+
     private boolean buildDirectoryChunks(String dirName) throws IOException {
         final StringBuilder sb = new StringBuilder(dirName.length());
         final String[] dirs = dirName.split("\\/|\\\\");
 
         boolean success = false;
-        for (String dir : dirs) {            
+        for (String dir : dirs) {
             sb.append(dir).append('/');
             String directory = sb.toString();
 
@@ -243,21 +331,4 @@
         return success;
     }
 
-    public FTPClient changeCurrentDirectory(FTPClient client, String path) throws GenericFileOperationFailedException {
-        try {
-            client.changeWorkingDirectory(path);
-            return client;
-        } catch (IOException e) {
-            throw new RemoteFileOperationFailedException("Failed to delete [" + path + "]", e);
-        }
-    }
-
-    public boolean deleteFile(String name) throws GenericFileOperationFailedException {
-        try {
-            return this.client.deleteFile(name);
-        } catch (IOException e) {
-            throw new RemoteFileOperationFailedException("Failed to delete [" + name + "]", e);
-        }
-    }
-
 }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpComponent.java Fri Feb 13 08:43:31 2009
@@ -50,7 +50,9 @@
         SftpConfiguration config = new SftpConfiguration(new URI(baseUri));
 
         SftpOperations operations = new SftpOperations();
-        return new SftpEndpoint(uri, this, operations, config);
+        SftpEndpoint result = new SftpEndpoint(uri, this, operations, config);
+        operations.setEndpoint(result);
+        return result;
     }
 
     protected void afterPropertiesSet(GenericFileEndpoint<ChannelSftp.LsEntry> endpoint) throws Exception {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java Fri Feb 13 08:43:31 2009
@@ -17,6 +17,8 @@
 package org.apache.camel.component.file.remote;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -46,11 +48,12 @@
  */
 public class SftpOperations implements RemoteFileOperations<ChannelSftp.LsEntry> {
     private static final transient Log LOG = LogFactory.getLog(SftpOperations.class);
+    private GenericFileEndpoint endpoint;
     private ChannelSftp channel;
     private Session session;
 
     public void setEndpoint(GenericFileEndpoint endpoint) {
-        // noop
+        this.endpoint = endpoint;
     }
 
     public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException {
@@ -194,9 +197,9 @@
                 }
             }
         } catch (IOException e) {
-            throw new RemoteFileOperationFailedException("Cannot build directory " + directory, e);
+            throw new RemoteFileOperationFailedException("Cannot build directory: " + directory, e);
         } catch (SftpException e) {
-            throw new RemoteFileOperationFailedException("Cannot build directory " + directory, e);
+            throw new RemoteFileOperationFailedException("Cannot build directory: " + directory, e);
         } finally {
             // change back to original directory
             if (originalDirectory != null) {
@@ -272,6 +275,16 @@
     }
 
     public boolean retrieveFile(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
+        if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
+            // local work directory is configured so we should store file content as files in this local directory
+            return retrieveFileToFileInLocalWorkDirectory(name, exchange);
+        } else {
+            // store file content directory as stream on the body
+            return retrieveFileToStreamInBody(name, exchange);
+        }
+    }
+
+    private boolean retrieveFileToStreamInBody(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
         try {
             GenericFile<ChannelSftp.LsEntry> target = exchange.getGenericFile();
             OutputStream os = new ByteArrayOutputStream();
@@ -279,19 +292,79 @@
             channel.get(name, os);
             return true;
         } catch (SftpException e) {
-            throw new RemoteFileOperationFailedException("Could not retrieve the file [" + name + "]", e);
+            throw new RemoteFileOperationFailedException("Cannot retrieve file: " + name, e);
         }
     }
 
+    private boolean retrieveFileToFileInLocalWorkDirectory(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
+        File temp;
+        File local = new File(endpoint.getLocalWorkDirectory());
+        OutputStream os;
+        try {
+            // use relative filename in local work directory
+            String relativeName = exchange.getGenericFile().getRelativeFileName();
+
+            temp = new File(local, relativeName + ".inprogress");
+            local = new File(local, relativeName);
+
+            // create directory to local work file
+            local.mkdirs();
+
+            // delete any existing files
+            if (temp.exists()) {
+                if (!temp.delete()) {
+                    throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + temp);
+                }
+            }
+            if (local.exists()) {
+                if (!local.delete()) {
+                    throw new RemoteFileOperationFailedException("Cannot delete existing local work file: " + local);
+                }
+            }
+
+            // create new temp local work file
+            if (!temp.createNewFile()) {
+                throw new RemoteFileOperationFailedException("Cannot create new local work file: " + temp);
+            }
+
+            // store content as a file in the local work directory in the temp handle
+            os = new FileOutputStream(temp);
+
+            // set header with the path to the local work file
+            exchange.getIn().setHeader("CamelFileLocalWorkPath", local.getPath());
+
+        } catch (Exception e) {
+            throw new RemoteFileOperationFailedException("Cannot create new local work file: " + local);
+        }
+
+        try {
+            GenericFile<ChannelSftp.LsEntry> target = exchange.getGenericFile();
+            // store the java.io.File handle as the body
+            target.setBody(local);
+            channel.get(name, os);
+
+            // rename temp to local after we have retrieved the data
+            if (!temp.renameTo(local)) {
+                throw new RemoteFileOperationFailedException("Cannot rename local work file from: " + temp + " to: " + local);
+            }
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot retrieve file: " + name, e);
+        } finally {
+            ObjectHelper.close(os, "retrieve: " + name, LOG);
+        }
+
+        return true;
+    }
+
     public boolean storeFile(String name, GenericFileExchange<ChannelSftp.LsEntry> exchange) throws GenericFileOperationFailedException {
         try {
             InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
             channel.put(in, name);
             return true;
         } catch (SftpException e) {
-            throw new RemoteFileOperationFailedException("Could not write the file [" + name + "]", e);
+            throw new RemoteFileOperationFailedException("Cannot store file: " + name, e);
         } catch (InvalidPayloadException e) {
-            throw new RemoteFileOperationFailedException("Could not write the file [" + name + "]", e);
+            throw new RemoteFileOperationFailedException("Cannot store file: " + name, e);
         }
     }
 

Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java?rev=744032&r1=744031&r2=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java Fri Feb 13 08:43:31 2009
@@ -26,7 +26,7 @@
 
     private String getFtpUrl() {
         return "ftp://admin@localhost:" + getPort()
-                + "/idempotent?password=admin&binary=false&idempotent=true&delete=true";
+                + "/idempotent?password=admin&binary=false&idempotent=true&delete=true&delay=1000";
     }
 
     @Override

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java (from r743796, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java&r1=743796&r2=744032&rev=744032&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerBodyAsStringTest.java (original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java Fri Feb 13 08:43:31 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.file.remote;
 
+import java.io.File;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -23,18 +25,20 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.NewFileComponent;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.IOConverter;
 
 /**
  * @version $Revision$
  */
-public class FtpConsumerBodyAsStringTest extends FtpServerTestSupport {
+public class FtpConsumerLocalWorkDirectoryTest extends FtpServerTestSupport {
 
     private String getFtpUrl() {
-        return "ftp://admin@localhost:" + getPort() + "/tmp4/camel?password=admin&consumer.delay=5000";
+        return "ftp://admin@localhost:" + getPort() + "/lwd/?password=admin&delay=5000&localWorkDirectory=target/lwd";
     }
 
     @Override
     protected void setUp() throws Exception {
+        deleteDirectory("target/lwd");
         super.setUp();
         prepareFtpServer();
     }
@@ -52,12 +56,23 @@
         producer.stop();
     }
 
-    public void testSingleFileTest() throws Exception {
+    public void testLocalWorkDirectory() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
         mock.expectedMessageCount(1);
 
         assertMockEndpointsSatisfied();
+
+        Thread.sleep(200);
+
+        // now the lwd file should be deleted
+        File local = new File("target/lwd/hello.txt").getAbsoluteFile();
+        assertFalse("Local work file should have been deleted", local.exists());
+
+        // and the out file should exists
+        File out = new File("target/out/hello.txt").getAbsoluteFile();
+        assertTrue("file should exists", out.exists());
+        assertEquals("Hello World", IOConverter.toString(out));
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -65,11 +80,12 @@
             public void configure() throws Exception {
                 from(getFtpUrl()).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        String body = exchange.getIn().getBody(String.class);
+                        File body = exchange.getIn().getBody(File.class);
                         assertNotNull(body);
-                        assertEquals("Hello World", body);
+                        assertTrue("Local work file should exists", body.exists());
+                        assertEquals("target/lwd/hello.txt", body.getPath());
                     }
-                }).to("mock:result");
+                }).to("file://target/out", "mock:result");
             }
         };
     }

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerLocalWorkDirectoryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date