You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/12/10 10:07:34 UTC

svn commit: r725036 - in /servicemix/sandbox/lhein/servicemix-vfs/trunk: pom.xml src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java

Author: lhein
Date: Wed Dec 10 01:07:34 2008
New Revision: 725036

URL: http://svn.apache.org/viewvc?rev=725036&view=rev
Log:
optimized the scheduling of files to be processed including a check if transmission is complete


Modified:
    servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml
    servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java

Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml?rev=725036&r1=725035&r2=725036&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml Wed Dec 10 01:07:34 2008
@@ -53,6 +53,7 @@
           org.apache.xbean.spring.context.v2,
           org.springframework.beans.factory.xml,
           sun.misc;resolution:=optional,
+          javax.crypto;resolution:=optional,
           *
       </servicemix.osgi.import>
       <servicemix.osgi.export>

Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java?rev=725036&r1=725035&r2=725036&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java Wed Dec 10 01:07:34 2008
@@ -18,6 +18,10 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -72,6 +76,8 @@
     private FileSystemManager fileSystemManager;
     private DefaultFileMonitor monitor;
     private ConcurrentMap<String, InputStream> openExchanges = new ConcurrentHashMap<String, InputStream>();
+    private Timer scheduleTimer;
+    private FileScheduler fileSchedulerTask;
     
     /**
      * default constructor
@@ -115,6 +121,19 @@
             file = FileObjectResolver.resolveToFileObject(getFileSystemManager(), getPath());
         }
         
+        // create timer task
+        if (fileSchedulerTask == null) {
+            fileSchedulerTask = new FileScheduler();
+        }
+        
+        // create timer 
+        if (scheduleTimer == null) {
+            scheduleTimer = new Timer("VFSFileObjectScheduler", true);
+        }
+
+        // setup timer
+        scheduleTimer.scheduleAtFixedRate(fileSchedulerTask, 0, getPeriod());
+        
         // create the monitor
         if (this.monitor == null) {
             this.monitor = new DefaultFileMonitor(this);
@@ -132,6 +151,19 @@
         }
     }
     
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
+     */
+    @Override
+    public synchronized void stop() throws Exception {
+        this.monitor.stop();
+        
+        this.scheduleTimer.cancel();
+        this.scheduleTimer = null;
+        
+        super.stop();
+    }
+    
     /**
      * updates the last modified date time
      * 
@@ -156,22 +188,12 @@
     }
     
     /* (non-Javadoc)
-     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
-     */
-    @Override
-    public synchronized void stop() throws Exception {
-        this.monitor.stop();
-        
-        super.stop();
-    }
-    
-    /* (non-Javadoc)
      * @see org.apache.commons.vfs.FileListener#fileChanged(org.apache.commons.vfs.FileChangeEvent)
      */
     public void fileChanged(FileChangeEvent event) throws Exception {
         FileObject aFile = event.getFile();        
         logger.debug("FileChanged: " + aFile.getName().getPathDecoded());
-        processFile(aFile);        
+        fileSchedulerTask.scheduleForProcessing(aFile);
     }
 
     /* (non-Javadoc)
@@ -180,15 +202,39 @@
     public void fileCreated(FileChangeEvent event) throws Exception {
         FileObject aFile = event.getFile();        
         logger.debug("FileCreated: " + aFile.getName().getPathDecoded());
-        processFile(aFile);
+        fileSchedulerTask.scheduleForProcessing(aFile);
     }
 
     /* (non-Javadoc)
      * @see org.apache.commons.vfs.FileListener#fileDeleted(org.apache.commons.vfs.FileChangeEvent)
      */
     public void fileDeleted(FileChangeEvent event) throws Exception {
-        FileObject aFile = event.getFile();
-        logger.debug("FileDeleted: " + aFile.getName().getPathDecoded());
+        logger.debug("FileDeleted: " + event.getFile().getName().getPathDecoded());
+    }
+    
+    /**
+     * checks whether the file is fully transmitted or not
+     * 
+     * @param aFile     the file
+     * @return          true if fully transmitted
+     */
+    private boolean isFileFullyAvailable(FileObject aFile) {
+        try {
+            if (aFile.getContent() != null) {
+                long size_old = aFile.getContent().getSize();
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }                
+                long size_new = aFile.getContent().getSize();
+                
+                return (size_old == size_new);
+            } 
+        } catch (FileSystemException ex) {
+            logger.error("Can't determine size of file " + aFile.getName().getPath(), ex);
+        }        
+        return true;
     }
     
     /*
@@ -411,4 +457,49 @@
     public boolean isRecursive() {
         return this.recursive;
     }
+    
+    /**
+     * a scheduler which monitors transmission state of files and calls processing
+     * if the file is fully available
+     * 
+     * @author lhein
+     */
+    class FileScheduler extends TimerTask {
+        private ConcurrentMap<String, FileObject> scheduledFiles = new ConcurrentHashMap<String, FileObject>();
+        
+        /* (non-Javadoc)
+         * @see java.util.TimerTask#run()
+         */
+        @Override
+        public void run() {
+            List<FileObject> tempList = new LinkedList<FileObject>();
+            tempList.addAll(scheduledFiles.values());
+            
+            // working on a temp list
+            for (FileObject f : tempList) {
+                if (isFileFullyAvailable(f)) {
+                    try {
+                        // process file
+                        processFile(f);
+                        // remove from scheduled files
+                        scheduledFiles.remove(f.getName().getURI());
+                    } catch (Exception ex) {
+                        logger.error("Error processing file " + f.getName().getPath(), ex);
+                    }
+                } else {
+                    logger.debug("Skipping file " + f.getName().getPath() + " because it is not fully available yet...");
+                }                    
+            }
+        }
+        
+        /**
+         * schedules a file for processing
+         * 
+         * @param aFile the file to process
+         * @return      true if it will be processed
+         */
+        public boolean scheduleForProcessing(FileObject aFile) {
+            return scheduledFiles.putIfAbsent(aFile.getName().getURI(), aFile) == null;
+        }
+    }    
 }