You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/12/10 11:00:03 UTC

svn commit: r725045 - /servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java

Author: lhein
Date: Wed Dec 10 02:00:02 2008
New Revision: 725045

URL: http://svn.apache.org/viewvc?rev=725045&view=rev
Log:
changed back to traditional polling logic because the FileMonitor is not usable for non-file protocols

Modified:
    servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java

Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java?rev=725045&r1=725044&r2=725045&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java Wed Dec 10 02:00:02 2008
@@ -18,12 +18,11 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.Lock;
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.ExchangeStatus;
@@ -35,18 +34,16 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.vfs.FileChangeEvent;
 import org.apache.commons.vfs.FileContent;
-import org.apache.commons.vfs.FileListener;
 import org.apache.commons.vfs.FileObject;
 import org.apache.commons.vfs.FileSelector;
-import org.apache.commons.vfs.FileSystemException;
 import org.apache.commons.vfs.FileSystemManager;
 import org.apache.commons.vfs.FileType;
-import org.apache.commons.vfs.impl.DefaultFileMonitor;
 import org.apache.servicemix.common.DefaultComponent;
 import org.apache.servicemix.common.ServiceUnit;
-import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+import org.apache.servicemix.common.endpoints.PollingEndpoint;
+import org.apache.servicemix.common.locks.LockManager;
+import org.apache.servicemix.common.locks.impl.SimpleLockManager;
 import org.apache.servicemix.components.util.DefaultFileMarshaler;
 import org.apache.servicemix.components.util.FileMarshaler;
 
@@ -63,21 +60,19 @@
  * 
  * @author lhein
  */
-public class VFSPollingEndpoint extends ConsumerEndpoint implements VFSEndpointType, FileListener {
+public class VFSPollingEndpoint extends PollingEndpoint implements VFSEndpointType {
     private static final Log logger = LogFactory.getLog(VFSPollingEndpoint.class);
     
     private FileMarshaler marshaler = new DefaultFileMarshaler();
     private FileObject file;
     private FileSelector selector;
-    private boolean deleteFile = true;
-    private boolean recursive = true;
+    private Set<FileObject> workingSet = new CopyOnWriteArraySet<FileObject>();
+    private boolean deleteFile;
+    private boolean recursive;
     private String path;
-    private long period = 1000;
     private FileSystemManager fileSystemManager;
-    private DefaultFileMonitor monitor;
+    private LockManager lockManager;
     private ConcurrentMap<String, InputStream> openExchanges = new ConcurrentHashMap<String, InputStream>();
-    private Timer scheduleTimer;
-    private FileScheduler fileSchedulerTask;
     
     /**
      * default constructor
@@ -113,6 +108,9 @@
     public synchronized void start() throws Exception {
         super.start();
 
+        // clear the set of already processed files
+        this.workingSet.clear();
+        
         // re-create the openExchanges map
         this.openExchanges = new ConcurrentHashMap<String, InputStream>();
         
@@ -121,120 +119,19 @@
             file = FileObjectResolver.resolveToFileObject(getFileSystemManager(), getPath());
         }
         
-        // create timer task
-        if (fileSchedulerTask == null) {
-            fileSchedulerTask = new FileScheduler();
-        }
-        
-        // create timer 
-        if (scheduleTimer == null) {
-            scheduleTimer = new Timer("VFSFileObjectScheduler", true);
-        }
-
-        // setup timer
-        scheduleTimer.scheduleAtFixedRate(fileSchedulerTask, 0, getPeriod());
-        
-        // create the monitor
-        if (this.monitor == null) {
-            this.monitor = new DefaultFileMonitor(this);
-        }
-        this.monitor.setRecursive(isRecursive());
-        this.monitor.setDelay(getPeriod());
-        this.monitor.addFile(file);
-        this.monitor.start();
-        
-        // update time stamps of existing files - this is needed because the
-        // file monitor only recognizes changes and existing files are not 
-        // changed in any way and will therefore stay unrecognized
-        if (file.exists()) {
-            updateFileModified(file);                
+        // create a lock manager
+        if (lockManager == null) {
+            lockManager = createLockManager();
         }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
-     */
-    @Override
-    public synchronized void stop() throws Exception {
-        this.monitor.stop();
-        
-        this.scheduleTimer.cancel();
-        this.scheduleTimer = null;
-        
-        super.stop();
-    }
-    
-    /**
-     * updates the last modified date time
-     * 
-     * @param aFile     last modified date time
-     */
-    private void updateFileModified(FileObject aFile) {
-        try {
-            if (aFile.getContent() != null) {
-                aFile.getContent().setLastModifiedTime(System.currentTimeMillis());
-            }
-            
-            if (!aFile.getType().equals(FileType.FILE)) {
-                FileObject[] files = aFile.getChildren();
-                for (FileObject f : files) {
-                    updateFileModified(f);
-                }
-            }
-        } catch (FileSystemException ex) {
-            // ignored
-            logger.debug("Failed to update time stamp of file " + aFile.getName().getPath(), ex);
-        }
-    }
-    
-    /* (non-Javadoc)
-     * @see org.apache.commons.vfs.FileListener#fileChanged(org.apache.commons.vfs.FileChangeEvent)
-     */
-    public void fileChanged(FileChangeEvent event) throws Exception {
-        FileObject aFile = event.getFile();        
-        logger.debug("FileChanged: " + aFile.getName().getPathDecoded());
-        fileSchedulerTask.scheduleForProcessing(aFile);
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.commons.vfs.FileListener#fileCreated(org.apache.commons.vfs.FileChangeEvent)
-     */
-    public void fileCreated(FileChangeEvent event) throws Exception {
-        FileObject aFile = event.getFile();        
-        logger.debug("FileCreated: " + aFile.getName().getPathDecoded());
-        fileSchedulerTask.scheduleForProcessing(aFile);
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.commons.vfs.FileListener#fileDeleted(org.apache.commons.vfs.FileChangeEvent)
-     */
-    public void fileDeleted(FileChangeEvent event) throws Exception {
-        logger.debug("FileDeleted: " + event.getFile().getName().getPathDecoded());
-    }
-    
     /**
-     * checks whether the file is fully transmitted or not
+     * returns the lock manager
      * 
-     * @param aFile     the file
-     * @return          true if fully transmitted
+     * @return  the lock manager
      */
-    private boolean isFileFullyAvailable(FileObject aFile) {
-        try {
-            if (aFile.getContent() != null) {
-                long size_old = aFile.getContent().getSize();
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // ignore
-                }                
-                long size_new = aFile.getContent().getSize();
-                
-                return (size_old == size_new);
-            } 
-        } catch (FileSystemException ex) {
-            logger.error("Can't determine size of file " + aFile.getName().getPath(), ex);
-        }        
-        return true;
+    protected LockManager createLockManager() {
+        return new SimpleLockManager();
     }
     
     /*
@@ -292,6 +189,8 @@
                 //workingSet.remove(aFile);
                 // remove the open exchange
                 openExchanges.remove(exchange.getExchangeId());
+                // unlock the file
+                unlockAsyncFile(aFile);
             }
 
         } else {
@@ -302,24 +201,144 @@
     }
     
     /**
+     * unlock the file
+     * 
+     * @param file the file to unlock
+     */
+    private void unlockAsyncFile(FileObject file) {
+        // finally remove the file from the open exchanges list
+        String uri = file.getName().getURI().toString();
+        Lock lock = lockManager.getLock(uri);
+        if (lock != null) {
+            try {
+                lock.unlock();
+            } catch (Exception ex) {
+                // can't release the lock
+                logger.error(ex);
+            }
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
+     */
+    @Override
+    public void poll() throws Exception {
+        // SM-192: Force close the file, so that the cached informations are cleared
+        if (file != null) {
+            file.close();
+            pollFileOrDirectory(file);
+        }        
+    }
+    
+    /**
+     * polls a file which is not clear to be a file or folder
+     * 
+     * @param fileOrDirectory   the file or folder object
+     * @throws Exception        on IO errors
+     */
+    protected void pollFileOrDirectory(FileObject fileOrDirectory) throws Exception {
+        pollFileOrDirectory(fileOrDirectory, true);
+    }
+    
+    /**
+     * recursive method for processing a file or a folder
+     * 
+     * @param fileOrDirectory   the file or folder object
+     * @param processDir        flag if processing should act recursive
+     * @throws Exception        on IO errors
+     */
+    protected void pollFileOrDirectory(FileObject fileOrDirectory, boolean processDir) throws Exception {
+        // check if it is a file object
+        if (fileOrDirectory.getType().equals(FileType.FILE)) {
+            // process the file
+            pollFile(fileOrDirectory); 
+        } else if (processDir) {
+            // process the folder
+            logger.debug("Polling directory " + fileOrDirectory.getName().getPathDecoded());
+            
+            FileObject[] files = null;
+            if (selector != null) {
+                files = fileOrDirectory.findFiles(selector);
+            } else {
+                files = fileOrDirectory.getChildren();
+            }
+            // process each file inside folder
+            for (FileObject f : files) {
+                // self-recursion
+                pollFileOrDirectory(f, isRecursive()); 
+            }
+        } else {
+            logger.debug("Skipping directory " + fileOrDirectory.getName().getPathDecoded());
+        }
+    }
+    
+    /**
+     * polls a file object
+     * 
+     * @param aFile     the file object
+     * @throws Exception        on IO errors
+     */
+    protected void pollFile(final FileObject aFile) throws Exception {
+        // try to add to set of processed files
+        if (workingSet.add(aFile)) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Scheduling file " + aFile.getName().getPathDecoded() + " for processing");
+            }
+            
+            // execute processing in another thread
+            getExecutor().execute(new Runnable() {
+                public void run() {
+                    String uri = aFile.getName().getURI().toString();
+                    Lock lock = lockManager.getLock(uri);
+                    if (lock.tryLock()) {
+                        processFileNow(aFile);
+                    } else {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Unable to acquire lock on " + aFile.getName().getURI());
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * processes a file
+     * 
+     * @param aFile     the file to process
+     */
+    protected void processFileNow(FileObject aFile) {
+        try {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing file " + aFile.getName().getURI());
+            }
+            
+            if (aFile.exists()) {
+                processFile(aFile);
+            }
+        } catch (Exception e) {
+            logger.error("Failed to process file: " + aFile.getName().getURI() + ". Reason: " + e, e);
+        }
+    }
+
+    /**
      * does the real processing logic
      * 
-     * @param aFile             the file to process
+     * @param file              the file to process
      * @throws Exception        on processing errors
      */
-    protected void processFile(FileObject aFile) throws Exception {
-        logger.debug("Processing file " + aFile.getName().getPathDecoded());
-        
+    protected void processFile(FileObject file) throws Exception {
         // SM-192: Force close the file, so that the cached informations are cleared
-        aFile.close();
+        file.close();
         
-        String name = aFile.getName().getPathDecoded();
-        FileContent content = aFile.getContent();
+        String name = file.getName().getURI();
+        FileContent content = file.getContent();
         content.close();
        
         InputStream stream = content.getInputStream();
         if (stream == null) {
-            throw new IOException("No input available for file " + aFile.getName().getPathDecoded());
+            throw new IOException("No input available for file!");
         }
         
         InOnly exchange = getExchangeFactory().createInOnlyExchange();
@@ -330,7 +349,7 @@
         
         // sending the file itself along as a message property and holding on to
         // the stream we opened
-        exchange.getInMessage().setProperty(VFSComponent.VFS_PROPERTY, aFile);
+        exchange.getInMessage().setProperty(VFSComponent.VFS_PROPERTY, file);
         this.openExchanges.put(exchange.getExchangeId(), stream);
 
         send(exchange);
@@ -351,6 +370,23 @@
     }
 
     /**
+     * Bean defining the class implementing the file locking strategy. This bean
+     * must be an implementation of the
+     * <code>org.apache.servicemix.locks.LockManager</code> interface. By
+     * default, this will be set to an instances of
+     * <code>org.apache.servicemix.common.locks.impl.SimpleLockManager</code>.
+     * 
+     * @param lockManager the <code>LockManager</code> implementation to use
+     */
+    public void setLockManager(LockManager lockManager) {
+        this.lockManager = lockManager;
+    }
+
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+    
+    /**
      * Specifies a <code>FileMarshaler</code> object that will marshal file data
      * into the NMR. The default file marshaller can read valid XML data.
      * <code>FileMarshaler</code> objects are implementations of
@@ -430,76 +466,26 @@
     }
 
     /**
-     * Specifies the period between polling cycles in millis.
-     * 
-     * @param period The period to set as <code>long</code> value containing millis.
+     * The set of FTPFiles that this component is currently working on
+     *
+     * @return  a set of in-process file objects
      */
-    public void setPeriod(long period) {
-        this.period = period;
-    }
-
-    public long getPeriod() {
-        return this.period;
+    public Set<FileObject> getWorkingSet() {
+        return workingSet;
     }
     
-    /**
-     * Specifies if sub-directories are polled; if false then the poller will
-     * only poll the specified directory. If the endpoint is configured to poll
-     * for a specific file rather than a directory then this attribute is
-     * ignored. Default is <code>true</code>.
-     * 
-     * @param recursive a baolean specifying if sub-directories should be polled
+    /** 
+     * @return Returns the recursive.
      */
-    public void setRecursive(boolean recursive) {
-        this.recursive = recursive;
-    }
-
     public boolean isRecursive() {
         return this.recursive;
     }
-    
+
     /**
-     * a scheduler which monitors transmission state of files and calls processing
-     * if the file is fully available
-     * 
-     * @author lhein
+     * @param recursive The recursive to set.
      */
-    class FileScheduler extends TimerTask {
-        private ConcurrentMap<String, FileObject> scheduledFiles = new ConcurrentHashMap<String, FileObject>();
-        
-        /* (non-Javadoc)
-         * @see java.util.TimerTask#run()
-         */
-        @Override
-        public void run() {
-            List<FileObject> tempList = new LinkedList<FileObject>();
-            tempList.addAll(scheduledFiles.values());
-            
-            // working on a temp list
-            for (FileObject f : tempList) {
-                if (isFileFullyAvailable(f)) {
-                    try {
-                        // process file
-                        processFile(f);
-                        // remove from scheduled files
-                        scheduledFiles.remove(f.getName().getURI());
-                    } catch (Exception ex) {
-                        logger.error("Error processing file " + f.getName().getPath(), ex);
-                    }
-                } else {
-                    logger.debug("Skipping file " + f.getName().getPath() + " because it is not fully available yet...");
-                }                    
-            }
-        }
-        
-        /**
-         * schedules a file for processing
-         * 
-         * @param aFile the file to process
-         * @return      true if it will be processed
-         */
-        public boolean scheduleForProcessing(FileObject aFile) {
-            return scheduledFiles.putIfAbsent(aFile.getName().getURI(), aFile) == null;
-        }
-    }    
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
 }