You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/12/08 15:35:52 UTC

svn commit: r724363 - in /servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file: FilePollerEndpoint.java FileSenderEndpoint.java

Author: lhein
Date: Mon Dec  8 06:35:51 2008
New Revision: 724363

URL: http://svn.apache.org/viewvc?rev=724363&view=rev
Log:
now checking if size of scheduled file is still increasing or not (copy not finished yet) and skipping the file if it is still being copied.
also reformated the code of both endpoint classes

Modified:
    servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
    servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java

Modified: servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=724363&r1=724362&r2=724363&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java Mon Dec  8 06:35:51 2008
@@ -42,6 +42,7 @@
 import org.apache.servicemix.common.locks.impl.SimpleLockManager;
 import org.apache.servicemix.components.util.DefaultFileMarshaler;
 import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.util.FileUtil;
 
 /**
  * A polling endpoint that looks for a file or files in a directory and sends
@@ -51,8 +52,8 @@
  * payload. For non-XML payload, e.g. plain-text or binary files, use an
  * alternative marshaler such as the
  * <code>org.apache.servicemix.components.util.BinaryFileMa
+ * 
  * @org.apache.xbean.XBean element="poller"
- *
  * @version $Revision$
  */
 public class FilePollerEndpoint extends PollingEndpoint implements FileEndpointType {
@@ -78,17 +79,18 @@
         super(component, endpoint);
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
      * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
      */
     @Override
     public synchronized void start() throws Exception {
         super.start();
-        
+
         // re-create the openExchanges map
         this.openExchanges = new ConcurrentHashMap<String, InputStream>();
     }
-    
+
     public void poll() throws Exception {
         pollFileOrDirectory(file);
     }
@@ -116,158 +118,144 @@
             lockManager = createLockManager();
         }
     }
-    
+
     protected LockManager createLockManager() {
         return new SimpleLockManager();
     }
 
-
     // Properties
-	// -------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
 
     /**
-	 * Specifies the file or directory to be polled. If it is a directory, all
-	 * files in the directory or its sub-directories will be processed by the
-	 * endpoint. If it is a file, only files matching the filename will be
-	 * processed."
-	 * 
-	 * @param file
-	 *            a <code>File</code> object representing the directory or file
-	 *            to poll
-	 */
+     * Specifies the file or directory to be polled. If it is a directory, all
+     * files in the directory or its sub-directories will be processed by the
+     * endpoint. If it is a file, only files matching the filename will be
+     * processed."
+     * 
+     * @param file a <code>File</code> object representing the directory or file
+     *            to poll
+     */
     public void setFile(File file) {
         this.file = file;
     }
-    
-	public File getFile() {
-		return file;
-	}
+
+    public File getFile() {
+        return file;
+    }
 
     /**
-	 * Bean defining the class implementing the file locking strategy. This bean
-	 * must be an implementation of the
-	 * <code>org.apache.servicemix.locks.LockManager</code> interface. By
-	 * default, this will be set to an instances of
-	 * <code>org.apache.servicemix.common.locks.impl.SimpleLockManager</code>.
-	 * 
-	 * @param lockManager
-	 *            the <code>LockManager</code> implementation to use
-	 */
-	public void setLockManager(LockManager lockManager) {
-		this.lockManager = lockManager;
-	}
-
-	public LockManager getLockManager() {
-		return lockManager;
-	}
-    
+     * Bean defining the class implementing the file locking strategy. This bean
+     * must be an implementation of the
+     * <code>org.apache.servicemix.locks.LockManager</code> interface. By
+     * default, this will be set to an instances of
+     * <code>org.apache.servicemix.common.locks.impl.SimpleLockManager</code>.
+     * 
+     * @param lockManager the <code>LockManager</code> implementation to use
+     */
+    public void setLockManager(LockManager lockManager) {
+        this.lockManager = lockManager;
+    }
+
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+
     /**
-	 * Bean defining the class implementing the file filtering strategy. This
-	 * bean must be an implementation of the <code>java.io.FileFilter</code>
-	 * interface.
-	 * 
-	 * @param filter
-	 *            a <code>FileFilter</code> implementation defining the
-	 *            endpoint's filtering logic
-	 * 
-	 */
-	public void setFilter(FileFilter filter) {
-		this.filter = filter;
-	}
-
-	public FileFilter getFilter() {
-		return filter;
-	}
+     * Bean defining the class implementing the file filtering strategy. This
+     * bean must be an implementation of the <code>java.io.FileFilter</code>
+     * interface.
+     * 
+     * @param filter a <code>FileFilter</code> implementation defining the
+     *            endpoint's filtering logic
+     */
+    public void setFilter(FileFilter filter) {
+        this.filter = filter;
+    }
+
+    public FileFilter getFilter() {
+        return filter;
+    }
 
-	
     /**
-     * Specifies if files should be deleted after they are processed. Default value is <code>true</code>.
+     * Specifies if files should be deleted after they are processed. Default
+     * value is <code>true</code>.
      * 
-	 * @param deleteFile
-	 *            a boolean specifying if the file should be deleted
-	 *            
-	 */
-	public void setDeleteFile(boolean deleteFile) {
-		this.deleteFile = deleteFile;
-	}
+     * @param deleteFile a boolean specifying if the file should be deleted
+     */
+    public void setDeleteFile(boolean deleteFile) {
+        this.deleteFile = deleteFile;
+    }
 
     public boolean isDeleteFile() {
         return deleteFile;
     }
 
-    
     /**
-	 * Specifies if sub-directories are polled; if false then the poller will
-	 * only poll the specified directory. If the endpoint is configured to poll
-	 * for a specific file rather than a directory then this attribute is
-	 * ignored. Default is <code>true</code>.
-	 * 
-	 * @param recursive
-	 *            a baolean specifying if sub-directories should be polled
-	 * 
-	 */
-	public void setRecursive(boolean recursive) {
-		this.recursive = recursive;
-	}
-	
-	public boolean isRecursive() {
-		return recursive;
-	}
+     * Specifies if sub-directories are polled; if false then the poller will
+     * only poll the specified directory. If the endpoint is configured to poll
+     * for a specific file rather than a directory then this attribute is
+     * ignored. Default is <code>true</code>.
+     * 
+     * @param recursive a baolean specifying if sub-directories should be polled
+     */
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
 
     /**
-	 * Specifies if the endpoint should create the target directory, if it does
-	 * not already exist. If you set this to <code>false</code> and the directory does
-	 * not exist, the endpoint will not do anything. Default value is <code>true</code>.
-	 * 
-	 * @param autoCreateDirectory
-	 *            a boolean specifying if the endpoint creates directories.
-	 * 
-	 */
-	public void setAutoCreateDirectory(boolean autoCreateDirectory) {
-		this.autoCreateDirectory = autoCreateDirectory;
-	}
-	
-	public boolean isAutoCreateDirectory() {
-		return autoCreateDirectory;
-	}
+     * Specifies if the endpoint should create the target directory, if it does
+     * not already exist. If you set this to <code>false</code> and the
+     * directory does not exist, the endpoint will not do anything. Default
+     * value is <code>true</code>.
+     * 
+     * @param autoCreateDirectory a boolean specifying if the endpoint creates
+     *            directories.
+     */
+    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
+        this.autoCreateDirectory = autoCreateDirectory;
+    }
 
+    public boolean isAutoCreateDirectory() {
+        return autoCreateDirectory;
+    }
 
     /**
-	 * Specifies a <code>FileMarshaler</code> object that will marshal file data
-	 * into the NMR. The default file marshaller can read valid XML data.
-	 * <code>FileMarshaler</code> objects are implementations of
-	 * <code>org.apache.servicemix.components.util.FileMarshaler</code>.
-	 * 
-	 * @param marshaler
-	 *            a <code>FileMarshaler</code> object that can read data from
-	 *            the file system.
-	 */
-	public void setMarshaler(FileMarshaler marshaler) {
-		this.marshaler = marshaler;
-	}
-
-	public FileMarshaler getMarshaler() {
-		return marshaler;
-	}
-    
+     * Specifies a <code>FileMarshaler</code> object that will marshal file data
+     * into the NMR. The default file marshaller can read valid XML data.
+     * <code>FileMarshaler</code> objects are implementations of
+     * <code>org.apache.servicemix.components.util.FileMarshaler</code>.
+     * 
+     * @param marshaler a <code>FileMarshaler</code> object that can read data
+     *            from the file system.
+     */
+    public void setMarshaler(FileMarshaler marshaler) {
+        this.marshaler = marshaler;
+    }
+
+    public FileMarshaler getMarshaler() {
+        return marshaler;
+    }
+
     /**
-	 * Specifies a directory relative to the polling directory to which
-	 * processed files are archived.
-	 * 
-	 * @param archive
-	 *            a <code>File</code> object for the archive directory
-	 */
-	public void setArchive(File archive) {
-		this.archive = archive;
-	}
-
-	public File getArchive() {
-		return archive;
-	}
-    
-    // Implementation methods
-    //-------------------------------------------------------------------------
+     * Specifies a directory relative to the polling directory to which
+     * processed files are archived.
+     * 
+     * @param archive a <code>File</code> object for the archive directory
+     */
+    public void setArchive(File archive) {
+        this.archive = archive;
+    }
 
+    public File getArchive() {
+        return archive;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
 
     protected void pollFileOrDirectory(File fileOrDirectory) {
         pollFileOrDirectory(fileOrDirectory, true);
@@ -291,6 +279,13 @@
         if (logger.isDebugEnabled()) {
             logger.debug("Scheduling file " + aFile + " for processing");
         }
+        if (!FileUtil.isFileFullyAvailable(aFile)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("The file " + aFile + " is still being copied. Skipping...");
+            }
+            // skip the file because it is not yet fully copied over
+            return;
+        }
         getExecutor().execute(new Runnable() {
             public void run() {
                 String uri = file.toURI().relativize(aFile.toURI()).toString();
@@ -327,10 +322,11 @@
         exchange.setInMessage(message);
         marshaler.readMessage(exchange, message, stream, file.getCanonicalPath());
 
-        //sending the file itself along as a message property and holding on to the stream we opened
+        // sending the file itself along as a message property and holding on to
+        // the stream we opened
         exchange.getInMessage().setProperty(FileComponent.FILE_PROPERTY, file);
         this.openExchanges.put(exchange.getExchangeId(), stream);
-        
+
         send(exchange);
     }
 
@@ -342,14 +338,15 @@
         // check for done or error
         if (this.openExchanges.containsKey(exchange.getExchangeId())) {
             InputStream stream = this.openExchanges.get(exchange.getExchangeId());
-            File aFile = (File) exchange.getMessage("in").getProperty(FileComponent.FILE_PROPERTY);
-            
+            File aFile = (File)exchange.getMessage("in").getProperty(FileComponent.FILE_PROPERTY);
+
             if (aFile == null) {
-                throw new JBIException("Property org.apache.servicemix.file was removed from the exchange -- unable to delete/archive the file");
+                throw new JBIException(
+                                       "Property org.apache.servicemix.file was removed from the exchange -- unable to delete/archive the file");
             }
 
             logger.debug("Releasing " + aFile.getAbsolutePath());
-            //first try to close the stream 
+            // first try to close the stream
             stream.close();
             try {
                 // check for state
@@ -362,15 +359,18 @@
                                 throw new IOException("Could not delete file " + aFile);
                             }
                         }
-                    } 
+                    }
                 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                     Exception e = exchange.getError();
                     if (e == null) {
-                        throw new JBIException("Received an exchange with status ERROR, but no exception was set");
+                        throw new JBIException(
+                                               "Received an exchange with status ERROR, but no exception was set");
                     }
-                    logger.warn("Message in file " + aFile + " could not be handled successfully: " + e.getMessage(), e);
+                    logger.warn("Message in file " + aFile + " could not be handled successfully: "
+                                + e.getMessage(), e);
                 } else {
-                    //we should never get an ACTIVE exchange -- the File poller only sends InOnly exchanges
+                    // we should never get an ACTIVE exchange -- the File poller
+                    // only sends InOnly exchanges
                     throw new JBIException("Unexpectedly received an exchange with status ACTIVE");
                 }
             } finally {
@@ -379,18 +379,18 @@
                 // unlock the file
                 unlockAsyncFile(aFile);
             }
-            
+
         } else {
             // strange, we don't know this exchange
             logger.debug("Received unknown exchange. Will be ignored...");
             return;
-        }            
+        }
     }
-    
+
     /**
      * unlock the file
      * 
-     * @param file      the file to unlock
+     * @param file the file to unlock
      */
     private void unlockAsyncFile(File file) {
         // finally remove the file from the open exchanges list
@@ -398,7 +398,7 @@
         Lock lock = lockManager.getLock(uri);
         if (lock != null) {
             try {
-                lock.unlock();                            
+                lock.unlock();
             } catch (Exception ex) {
                 // can't release the lock
                 logger.error(ex);
@@ -411,17 +411,21 @@
      * 
      * @param src
      * @param targetDirectory
-     * @throws IOException 
+     * @throws IOException
      */
     public static void moveFile(File src, File targetDirectory) throws IOException {
-    	String targetName = src.getName();
-    	File target = new File(targetDirectory, targetName);
-    	if (target.exists() && target.isFile()) {
-    		// the file is already inside archive...we need a new file name
-    		targetName = String.format("%d_%s", System.currentTimeMillis(), src.getName()); // that should be unique
-    	}
+        String targetName = src.getName();
+        File target = new File(targetDirectory, targetName);
+        if (target.exists() && target.isFile()) {
+            // the file is already inside archive...we need a new file name
+            targetName = String.format("%d_%s", System.currentTimeMillis(), src.getName()); // that
+                                                                                            // should
+                                                                                            // be
+                                                                                            // unique
+        }
         if (!src.renameTo(new File(targetDirectory, targetName))) {
-            throw new IOException("Failed to move " + src + " to " + targetDirectory + " with new name " + targetName);
+            throw new IOException("Failed to move " + src + " to " + targetDirectory + " with new name "
+                                  + targetName);
         }
     }
 }

Modified: servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java?rev=724363&r1=724362&r2=724363&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FileSenderEndpoint.java Mon Dec  8 06:35:51 2008
@@ -27,12 +27,12 @@
 import org.apache.servicemix.components.util.FileMarshaler;
 
 /**
- * An endpoint which receives messages from the NMR and writes the message to the file system.
- *
+ * An endpoint which receives messages from the NMR and writes the message to
+ * the file system.
+ * 
  * @org.apache.xbean.XBean element="sender"
- *
  * @version $Revision: $
-*/
+ */
 public class FileSenderEndpoint extends ProviderEndpoint implements FileEndpointType {
 
     private File directory;
@@ -42,7 +42,6 @@
     private boolean autoCreateDirectory = true;
     private boolean append = true;
 
-
     public FileSenderEndpoint() {
         append = false;
     }
@@ -93,122 +92,119 @@
                     logger.error("Caught exception while closing stream on error: " + e, e);
                 }
             }
-            //cleaning up incomplete files after things went wrong
+            // cleaning up incomplete files after things went wrong
             if (!success) {
-                logger.debug("An error occurred while writing file " + newFile.getCanonicalPath() + ", deleting the invalid file");
+                logger.debug("An error occurred while writing file " + newFile.getCanonicalPath()
+                             + ", deleting the invalid file");
                 if (!newFile.delete()) {
-                    logger.warn("Unable to delete the file " + newFile.getCanonicalPath() + " after an error had occurred");
+                    logger.warn("Unable to delete the file " + newFile.getCanonicalPath()
+                                + " after an error had occurred");
                 }
             }
         }
     }
 
-    protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws Exception {
+    protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out)
+        throws Exception {
         /** TODO list the files? */
         super.processInOut(exchange, in, out);
     }
 
     // Properties
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
 
     /**
-	 * Specifies the directory where the endpoint writes files.
-	 * 
-	 * @param directory
-	 *            a <code>File</code> object representing the directory
-	 */
-	public void setDirectory(File directory) {
-		this.directory = directory;
-	}
-	
+     * Specifies the directory where the endpoint writes files.
+     * 
+     * @param directory a <code>File</code> object representing the directory
+     */
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
     public File getDirectory() {
         return directory;
     }
 
     /**
-	 * Specifies a <code>FileMarshaler</code> object that will marshal message
-	 * data from the NMR into a file. The default file marshaler can write
-	 * valid XML data. <code>FileMarshaler</code> objects are implementations of
-	 * <code>org.apache.servicemix.components.util.FileMarshaler</code>.
-	 * 
-	 * @param marshaler
-	 *            a <code>FileMarshaler</code> object that can write message
-	 *            data to the file system
-	 *            
-	 */
-	public void setMarshaler(FileMarshaler marshaler) {
-		this.marshaler = marshaler;
-	}
-
-	public FileMarshaler getMarshaler() {
-		return marshaler;
-	}
-
-    /**
-	 * Specifies a string to prefix to the beginning of generated temporary file
-	 * names. Temporary file names are generated when the endpoint cannot
-	 * determine the name of the file from the message.
-	 * 
-	 * @param tempFilePrefix
-	 *            a string to prefix to generated file names
-	 */         
+     * Specifies a <code>FileMarshaler</code> object that will marshal message
+     * data from the NMR into a file. The default file marshaler can write valid
+     * XML data. <code>FileMarshaler</code> objects are implementations of
+     * <code>org.apache.servicemix.components.util.FileMarshaler</code>.
+     * 
+     * @param marshaler a <code>FileMarshaler</code> object that can write
+     *            message data to the file system
+     */
+    public void setMarshaler(FileMarshaler marshaler) {
+        this.marshaler = marshaler;
+    }
+
+    public FileMarshaler getMarshaler() {
+        return marshaler;
+    }
+
+    /**
+     * Specifies a string to prefix to the beginning of generated temporary file
+     * names. Temporary file names are generated when the endpoint cannot
+     * determine the name of the file from the message.
+     * 
+     * @param tempFilePrefix a string to prefix to generated file names
+     */
     public void setTempFilePrefix(String tempFilePrefix) {
         this.tempFilePrefix = tempFilePrefix;
     }
-    
+
     public String getTempFilePrefix() {
         return tempFilePrefix;
     }
 
     /**
-	 * Specifies a string to append to generated temporary file names. Temporary
-	 * file names are generated when the endpoint cannot determine the name of
-	 * the file from the message.
-	 * 
-	 * @param tempFileSuffix
-	 *            a string to append to generated file names
-	 */
-	public void setTempFileSuffix(String tempFileSuffix) {
-		this.tempFileSuffix = tempFileSuffix;
-	}
-
-	public String getTempFileSuffix() {
-		return tempFileSuffix;
-	}
-
-    /**
-	 * Specifies if the endpoint should create the target directory if it does
-	 * not exist. If you set this to <code>false</code> and the directory does
-	 * not exist, the endpoint will not do anything. Default value: <code>true</code>.
-	 * 
-	 * @param autoCreateDirectory
-	 *            a boolean specifying if the endpoint creates directories
-	 *            
-	 */
-	public void setAutoCreateDirectory(boolean autoCreateDirectory) {
-		this.autoCreateDirectory = autoCreateDirectory;
-	}
-
-	public boolean isAutoCreateDirectory() {
-		return autoCreateDirectory;
-	}
-
-    /**
-	 * Specifies if the endpoint appends data to existing files or if it will
-	 * overwrite existing files. The default is for the endpoint to overwrite
-	 * existing files. Setting this to <code>true</code> instructs the endpoint
-	 * to append data. Default value is <code>false</code>.
-	 * 
-	 * @param append
-	 *            a boolean specifying if the endpoint appends data to existing
-	 *            files
-	 */
-	public void setAppend(boolean append) {
-		this.append = append;
-	}
-
-	public boolean isAppend() {
-		return append;
-	}
+     * Specifies a string to append to generated temporary file names. Temporary
+     * file names are generated when the endpoint cannot determine the name of
+     * the file from the message.
+     * 
+     * @param tempFileSuffix a string to append to generated file names
+     */
+    public void setTempFileSuffix(String tempFileSuffix) {
+        this.tempFileSuffix = tempFileSuffix;
+    }
+
+    public String getTempFileSuffix() {
+        return tempFileSuffix;
+    }
+
+    /**
+     * Specifies if the endpoint should create the target directory if it does
+     * not exist. If you set this to <code>false</code> and the directory does
+     * not exist, the endpoint will not do anything. Default value:
+     * <code>true</code>.
+     * 
+     * @param autoCreateDirectory a boolean specifying if the endpoint creates
+     *            directories
+     */
+    public void setAutoCreateDirectory(boolean autoCreateDirectory) {
+        this.autoCreateDirectory = autoCreateDirectory;
+    }
+
+    public boolean isAutoCreateDirectory() {
+        return autoCreateDirectory;
+    }
+
+    /**
+     * Specifies if the endpoint appends data to existing files or if it will
+     * overwrite existing files. The default is for the endpoint to overwrite
+     * existing files. Setting this to <code>true</code> instructs the endpoint
+     * to append data. Default value is <code>false</code>.
+     * 
+     * @param append a boolean specifying if the endpoint appends data to
+     *            existing files
+     */
+    public void setAppend(boolean append) {
+        this.append = append;
+    }
+
+    public boolean isAppend() {
+        return append;
+    }
 
-}
\ No newline at end of file
+}