You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/12/10 10:07:34 UTC
svn commit: r725036 - in /servicemix/sandbox/lhein/servicemix-vfs/trunk:
pom.xml src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
Author: lhein
Date: Wed Dec 10 01:07:34 2008
New Revision: 725036
URL: http://svn.apache.org/viewvc?rev=725036&view=rev
Log:
optimized the scheduling of files to be processed including a check if transmission is complete
Modified:
servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml
servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml?rev=725036&r1=725035&r2=725036&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/pom.xml Wed Dec 10 01:07:34 2008
@@ -53,6 +53,7 @@
org.apache.xbean.spring.context.v2,
org.springframework.beans.factory.xml,
sun.misc;resolution:=optional,
+ javax.crypto;resolution:=optional,
*
</servicemix.osgi.import>
<servicemix.osgi.export>
Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java?rev=725036&r1=725035&r2=725036&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java Wed Dec 10 01:07:34 2008
@@ -18,6 +18,10 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -72,6 +76,8 @@
private FileSystemManager fileSystemManager;
private DefaultFileMonitor monitor;
private ConcurrentMap<String, InputStream> openExchanges = new ConcurrentHashMap<String, InputStream>();
+ private Timer scheduleTimer;
+ private FileScheduler fileSchedulerTask;
/**
* default constructor
@@ -115,6 +121,19 @@
file = FileObjectResolver.resolveToFileObject(getFileSystemManager(), getPath());
}
+ // create timer task
+ if (fileSchedulerTask == null) {
+ fileSchedulerTask = new FileScheduler();
+ }
+
+ // create timer
+ if (scheduleTimer == null) {
+ scheduleTimer = new Timer("VFSFileObjectScheduler", true);
+ }
+
+ // setup timer
+ scheduleTimer.scheduleAtFixedRate(fileSchedulerTask, 0, getPeriod());
+
// create the monitor
if (this.monitor == null) {
this.monitor = new DefaultFileMonitor(this);
@@ -132,6 +151,19 @@
}
}
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
+ */
+ @Override
+ public synchronized void stop() throws Exception {
+ this.monitor.stop();
+
+ this.scheduleTimer.cancel();
+ this.scheduleTimer = null;
+
+ super.stop();
+ }
+
/**
* updates the last modified date time
*
@@ -156,22 +188,12 @@
}
/* (non-Javadoc)
- * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
- */
- @Override
- public synchronized void stop() throws Exception {
- this.monitor.stop();
-
- super.stop();
- }
-
- /* (non-Javadoc)
* @see org.apache.commons.vfs.FileListener#fileChanged(org.apache.commons.vfs.FileChangeEvent)
*/
public void fileChanged(FileChangeEvent event) throws Exception {
FileObject aFile = event.getFile();
logger.debug("FileChanged: " + aFile.getName().getPathDecoded());
- processFile(aFile);
+ fileSchedulerTask.scheduleForProcessing(aFile);
}
/* (non-Javadoc)
@@ -180,15 +202,39 @@
public void fileCreated(FileChangeEvent event) throws Exception {
FileObject aFile = event.getFile();
logger.debug("FileCreated: " + aFile.getName().getPathDecoded());
- processFile(aFile);
+ fileSchedulerTask.scheduleForProcessing(aFile);
}
/* (non-Javadoc)
* @see org.apache.commons.vfs.FileListener#fileDeleted(org.apache.commons.vfs.FileChangeEvent)
*/
public void fileDeleted(FileChangeEvent event) throws Exception {
- FileObject aFile = event.getFile();
- logger.debug("FileDeleted: " + aFile.getName().getPathDecoded());
+ logger.debug("FileDeleted: " + event.getFile().getName().getPathDecoded());
+ }
+
+ /**
+ * checks whether the file is fully transmitted or not
+ *
+ * @param aFile the file
+ * @return true if fully transmitted
+ */
+ private boolean isFileFullyAvailable(FileObject aFile) {
+ try {
+ if (aFile.getContent() != null) {
+ long size_old = aFile.getContent().getSize();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ long size_new = aFile.getContent().getSize();
+
+ return (size_old == size_new);
+ }
+ } catch (FileSystemException ex) {
+ logger.error("Can't determine size of file " + aFile.getName().getPath(), ex);
+ }
+ return true;
}
/*
@@ -411,4 +457,49 @@
public boolean isRecursive() {
return this.recursive;
}
+
+ /**
+ * a scheduler which monitors transmission state of files and calls processing
+ * if the file is fully available
+ *
+ * @author lhein
+ */
+ class FileScheduler extends TimerTask {
+ private ConcurrentMap<String, FileObject> scheduledFiles = new ConcurrentHashMap<String, FileObject>();
+
+ /* (non-Javadoc)
+ * @see java.util.TimerTask#run()
+ */
+ @Override
+ public void run() {
+ List<FileObject> tempList = new LinkedList<FileObject>();
+ tempList.addAll(scheduledFiles.values());
+
+ // working on a temp list
+ for (FileObject f : tempList) {
+ if (isFileFullyAvailable(f)) {
+ try {
+ // process file
+ processFile(f);
+ // remove from scheduled files
+ scheduledFiles.remove(f.getName().getURI());
+ } catch (Exception ex) {
+ logger.error("Error processing file " + f.getName().getPath(), ex);
+ }
+ } else {
+ logger.debug("Skipping file " + f.getName().getPath() + " because it is not fully available yet...");
+ }
+ }
+ }
+
+ /**
+ * schedules a file for processing
+ *
+ * @param aFile the file to process
+ * @return true if it will be processed
+ */
+ public boolean scheduleForProcessing(FileObject aFile) {
+ return scheduledFiles.putIfAbsent(aFile.getName().getURI(), aFile) == null;
+ }
+ }
}