You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2009/02/12 15:30:49 UTC

svn commit: r743763 - /servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java

Author: lhein
Date: Thu Feb 12 14:30:48 2009
New Revision: 743763

URL: http://svn.apache.org/viewvc?rev=743763&view=rev
Log:
fixed SM-1800

Modified:
    servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java

Modified: servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java?rev=743763&r1=743762&r2=743763&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-ftp/trunk/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java Thu Feb 12 14:30:48 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.StringTokenizer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
@@ -66,6 +67,8 @@
     private QName targetOperation;
     private URI uri;
     private boolean stateless = true;
+    private URI archive;
+    private boolean autoCreateDirectory = true;
 
     protected class FtpData {
         final String file;
@@ -104,6 +107,14 @@
         if (changeWorkingDirectory && recursive) {
             throw new DeploymentException("changeWorkingDirectory='true' can not be set when recursive='true'");
         }
+        if (archive != null && archive.getPath() == null) {
+            throw new DeploymentException("Archive specified without path information.");
+        }            
+        if (archive != null) {
+            if (!deleteFile) {
+                throw new DeploymentException("Archive shouldn't be specified unless deleteFile='true'");
+            }
+        }
     }
 
     public void start() throws Exception {
@@ -131,12 +142,64 @@
             str += "/";
             uri = new URI(str);
         }
+        
+        // borrow client from pool
+        FTPClient ftp = borrowClient();
+        String folderName = "";
+        try {
+            StringTokenizer strTok = null;
+            if (isAutoCreateDirectory() && !ftp.changeWorkingDirectory(getWorkingPath())) {
+                // it seems the folder isn't there, so create it
+                strTok = new StringTokenizer(getWorkingPath(), "/");
+                
+                while (strTok.hasMoreTokens()) {
+                    folderName += '/';
+                    folderName += strTok.nextToken();
+                    if (!ftp.changeWorkingDirectory(folderName)) {
+                        if (ftp.makeDirectory(folderName)) {
+                            // the folder now exists
+                        } else {
+                            // unable to create the folder
+                            throw new IOException("The defined folder " + getWorkingPath() + " doesn't exist on the server and it can't be created automatically.");
+                        }
+                    }
+                }
+            }
+            folderName = "";
+            if (getArchivePath() != null) {
+                if (isAutoCreateDirectory() && !ftp.changeWorkingDirectory(getArchivePath())) {
+                    // it seems the folder isn't there, so create it
+                    strTok = new StringTokenizer(getArchivePath(), "/");
+
+                    while (strTok.hasMoreTokens()) {
+                        folderName += '/';
+                        folderName += strTok.nextToken();
+                        if (!ftp.changeWorkingDirectory(folderName)) {
+                            if (ftp.makeDirectory(folderName)) {
+                                // the folder now exists
+                            } else {
+                                // unable to create the folder
+                                throw new IOException("The defined archive folder " + getArchivePath() + " doesn't exist on the server and it can't be created automatically.");
+                            }
+                        }
+                    }
+                }
+            }
+        } finally {
+            // give back the client
+            returnClient(ftp);
+        }
+
         super.start();
     }
 
     protected LockManager createLockManager() {
         return new SimpleLockManager();
     }
+    
+    private String getArchivePath() {
+        return (archive != null && archive.getPath() != null) ? archive.getPath() : null;
+    }
 
     private String getWorkingPath() {
         return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
@@ -229,6 +292,38 @@
     public void setStateless(boolean stateless) {
         this.stateless = stateless;
     }
+    
+    /**
+     * Specifies if the endpoint should create the target directory, if it does
+     * not already exist. If you set this to <code>false</code> and the
+     * directory does not exist, the endpoint will not do anything. Default
+     * value is <code>true</code>.
+     * 
+     * @param autoCreateDirectory a boolean specifying if the endpoint creates
+     *            directories.
+     */
+    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
+        this.autoCreateDirectory = autoCreateDirectory;
+    }
+
+    public boolean isAutoCreateDirectory() {
+        return autoCreateDirectory;
+    }
+    
+    /**
+     * Specifies a directory relative to the polling directory to which
+     * processed files are archived.
+     * 
+     * @param archive a <code>URI</code> object for the archive directory
+     */
+    public void setArchive(URI archive) {
+        this.archive = archive;
+    }
+
+    public URI getArchive() {
+        return archive;
+    }
+
 
     // Implementation methods
     //-------------------------------------------------------------------------
@@ -388,8 +483,14 @@
                 // check for state
                 if (exchange.getStatus() == ExchangeStatus.DONE) {
                     if (isDeleteFile()) {
-                        if (!data.ftp.deleteFile(data.file)) {
-                            throw new IOException("Could not delete file " + data.file);
+                        if (getArchivePath() != null) {
+                            // build a unique archive file name
+                            String newPath = String.format("%s/%d_%s", getArchivePath(), System.currentTimeMillis(), data.file.substring(data.file.lastIndexOf('/')+1));
+                            data.ftp.rename(data.file, newPath);
+                        } else {
+                            if (!data.ftp.deleteFile(data.file)) {
+                                throw new IOException("Could not delete file " + data.file);
+                            }
                         }
                     }
                 } else {
@@ -429,7 +530,7 @@
             lockManager.removeLock(file);
         }
     }
-
+    
     protected FTPClientPool createClientPool() throws Exception {
         FTPClientPool pool = new FTPClientPool();
         pool.afterPropertiesSet();