You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by lh...@apache.org on 2008/12/10 11:00:03 UTC
svn commit: r725045 -
/servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
Author: lhein
Date: Wed Dec 10 02:00:02 2008
New Revision: 725045
URL: http://svn.apache.org/viewvc?rev=725045&view=rev
Log:
changed back to traditional polling logic because the FileMonitor is not usable for non-file protocols
Modified:
servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
Modified: servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java?rev=725045&r1=725044&r2=725045&view=diff
==============================================================================
--- servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java (original)
+++ servicemix/sandbox/lhein/servicemix-vfs/trunk/src/main/java/org/apache/servicemix/vfs/VFSPollingEndpoint.java Wed Dec 10 02:00:02 2008
@@ -18,12 +18,11 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.Lock;
import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
@@ -35,18 +34,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.vfs.FileChangeEvent;
import org.apache.commons.vfs.FileContent;
-import org.apache.commons.vfs.FileListener;
import org.apache.commons.vfs.FileObject;
import org.apache.commons.vfs.FileSelector;
-import org.apache.commons.vfs.FileSystemException;
import org.apache.commons.vfs.FileSystemManager;
import org.apache.commons.vfs.FileType;
-import org.apache.commons.vfs.impl.DefaultFileMonitor;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.ServiceUnit;
-import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+import org.apache.servicemix.common.endpoints.PollingEndpoint;
+import org.apache.servicemix.common.locks.LockManager;
+import org.apache.servicemix.common.locks.impl.SimpleLockManager;
import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.components.util.FileMarshaler;
@@ -63,21 +60,19 @@
*
* @author lhein
*/
-public class VFSPollingEndpoint extends ConsumerEndpoint implements VFSEndpointType, FileListener {
+public class VFSPollingEndpoint extends PollingEndpoint implements VFSEndpointType {
private static final Log logger = LogFactory.getLog(VFSPollingEndpoint.class);
private FileMarshaler marshaler = new DefaultFileMarshaler();
private FileObject file;
private FileSelector selector;
- private boolean deleteFile = true;
- private boolean recursive = true;
+ private Set<FileObject> workingSet = new CopyOnWriteArraySet<FileObject>();
+ private boolean deleteFile;
+ private boolean recursive;
private String path;
- private long period = 1000;
private FileSystemManager fileSystemManager;
- private DefaultFileMonitor monitor;
+ private LockManager lockManager;
private ConcurrentMap<String, InputStream> openExchanges = new ConcurrentHashMap<String, InputStream>();
- private Timer scheduleTimer;
- private FileScheduler fileSchedulerTask;
/**
* default constructor
@@ -113,6 +108,9 @@
public synchronized void start() throws Exception {
super.start();
+ // clear the set of already processed files
+ this.workingSet.clear();
+
// re-create the openExchanges map
this.openExchanges = new ConcurrentHashMap<String, InputStream>();
@@ -121,120 +119,19 @@
file = FileObjectResolver.resolveToFileObject(getFileSystemManager(), getPath());
}
- // create timer task
- if (fileSchedulerTask == null) {
- fileSchedulerTask = new FileScheduler();
- }
-
- // create timer
- if (scheduleTimer == null) {
- scheduleTimer = new Timer("VFSFileObjectScheduler", true);
- }
-
- // setup timer
- scheduleTimer.scheduleAtFixedRate(fileSchedulerTask, 0, getPeriod());
-
- // create the monitor
- if (this.monitor == null) {
- this.monitor = new DefaultFileMonitor(this);
- }
- this.monitor.setRecursive(isRecursive());
- this.monitor.setDelay(getPeriod());
- this.monitor.addFile(file);
- this.monitor.start();
-
- // update time stamps of existing files - this is needed because the
- // file monitor only recognizes changes and existing files are not
- // changed in any way and will therefore stay unrecognized
- if (file.exists()) {
- updateFileModified(file);
+ // create a lock manager
+ if (lockManager == null) {
+ lockManager = createLockManager();
}
}
- /* (non-Javadoc)
- * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
- */
- @Override
- public synchronized void stop() throws Exception {
- this.monitor.stop();
-
- this.scheduleTimer.cancel();
- this.scheduleTimer = null;
-
- super.stop();
- }
-
- /**
- * updates the last modified date time
- *
- * @param aFile last modified date time
- */
- private void updateFileModified(FileObject aFile) {
- try {
- if (aFile.getContent() != null) {
- aFile.getContent().setLastModifiedTime(System.currentTimeMillis());
- }
-
- if (!aFile.getType().equals(FileType.FILE)) {
- FileObject[] files = aFile.getChildren();
- for (FileObject f : files) {
- updateFileModified(f);
- }
- }
- } catch (FileSystemException ex) {
- // ignored
- logger.debug("Failed to update time stamp of file " + aFile.getName().getPath(), ex);
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.commons.vfs.FileListener#fileChanged(org.apache.commons.vfs.FileChangeEvent)
- */
- public void fileChanged(FileChangeEvent event) throws Exception {
- FileObject aFile = event.getFile();
- logger.debug("FileChanged: " + aFile.getName().getPathDecoded());
- fileSchedulerTask.scheduleForProcessing(aFile);
- }
-
- /* (non-Javadoc)
- * @see org.apache.commons.vfs.FileListener#fileCreated(org.apache.commons.vfs.FileChangeEvent)
- */
- public void fileCreated(FileChangeEvent event) throws Exception {
- FileObject aFile = event.getFile();
- logger.debug("FileCreated: " + aFile.getName().getPathDecoded());
- fileSchedulerTask.scheduleForProcessing(aFile);
- }
-
- /* (non-Javadoc)
- * @see org.apache.commons.vfs.FileListener#fileDeleted(org.apache.commons.vfs.FileChangeEvent)
- */
- public void fileDeleted(FileChangeEvent event) throws Exception {
- logger.debug("FileDeleted: " + event.getFile().getName().getPathDecoded());
- }
-
/**
- * checks whether the file is fully transmitted or not
+ * returns the lock manager
*
- * @param aFile the file
- * @return true if fully transmitted
+ * @return the lock manager
*/
- private boolean isFileFullyAvailable(FileObject aFile) {
- try {
- if (aFile.getContent() != null) {
- long size_old = aFile.getContent().getSize();
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore
- }
- long size_new = aFile.getContent().getSize();
-
- return (size_old == size_new);
- }
- } catch (FileSystemException ex) {
- logger.error("Can't determine size of file " + aFile.getName().getPath(), ex);
- }
- return true;
+ protected LockManager createLockManager() {
+ return new SimpleLockManager();
}
/*
@@ -292,6 +189,8 @@
//workingSet.remove(aFile);
// remove the open exchange
openExchanges.remove(exchange.getExchangeId());
+ // unlock the file
+ unlockAsyncFile(aFile);
}
} else {
@@ -302,24 +201,144 @@
}
/**
+ * unlock the file
+ *
+ * @param file the file to unlock
+ */
+ private void unlockAsyncFile(FileObject file) {
+ // finally remove the file from the open exchanges list
+ String uri = file.getName().getURI().toString();
+ Lock lock = lockManager.getLock(uri);
+ if (lock != null) {
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ // can't release the lock
+ logger.error(ex);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.endpoints.PollingEndpoint#poll()
+ */
+ @Override
+ public void poll() throws Exception {
+ // SM-192: Force close the file, so that the cached informations are cleared
+ if (file != null) {
+ file.close();
+ pollFileOrDirectory(file);
+ }
+ }
+
+ /**
+ * polls a file which is not clear to be a file or folder
+ *
+ * @param fileOrDirectory the file or folder object
+ * @throws Exception on IO errors
+ */
+ protected void pollFileOrDirectory(FileObject fileOrDirectory) throws Exception {
+ pollFileOrDirectory(fileOrDirectory, true);
+ }
+
+ /**
+ * recursive method for processing a file or a folder
+ *
+ * @param fileOrDirectory the file or folder object
+ * @param processDir flag if processing should act recursive
+ * @throws Exception on IO errors
+ */
+ protected void pollFileOrDirectory(FileObject fileOrDirectory, boolean processDir) throws Exception {
+ // check if it is a file object
+ if (fileOrDirectory.getType().equals(FileType.FILE)) {
+ // process the file
+ pollFile(fileOrDirectory);
+ } else if (processDir) {
+ // process the folder
+ logger.debug("Polling directory " + fileOrDirectory.getName().getPathDecoded());
+
+ FileObject[] files = null;
+ if (selector != null) {
+ files = fileOrDirectory.findFiles(selector);
+ } else {
+ files = fileOrDirectory.getChildren();
+ }
+ // process each file inside folder
+ for (FileObject f : files) {
+ // self-recursion
+ pollFileOrDirectory(f, isRecursive());
+ }
+ } else {
+ logger.debug("Skipping directory " + fileOrDirectory.getName().getPathDecoded());
+ }
+ }
+
+ /**
+ * polls a file object
+ *
+ * @param aFile the file object
+ * @throws Exception on IO errors
+ */
+ protected void pollFile(final FileObject aFile) throws Exception {
+ // try to add to set of processed files
+ if (workingSet.add(aFile)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduling file " + aFile.getName().getPathDecoded() + " for processing");
+ }
+
+ // execute processing in another thread
+ getExecutor().execute(new Runnable() {
+ public void run() {
+ String uri = aFile.getName().getURI().toString();
+ Lock lock = lockManager.getLock(uri);
+ if (lock.tryLock()) {
+ processFileNow(aFile);
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to acquire lock on " + aFile.getName().getURI());
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * processes a file
+ *
+ * @param aFile the file to process
+ */
+ protected void processFileNow(FileObject aFile) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing file " + aFile.getName().getURI());
+ }
+
+ if (aFile.exists()) {
+ processFile(aFile);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to process file: " + aFile.getName().getURI() + ". Reason: " + e, e);
+ }
+ }
+
+ /**
* does the real processing logic
*
- * @param aFile the file to process
+ * @param file the file to process
* @throws Exception on processing errors
*/
- protected void processFile(FileObject aFile) throws Exception {
- logger.debug("Processing file " + aFile.getName().getPathDecoded());
-
+ protected void processFile(FileObject file) throws Exception {
// SM-192: Force close the file, so that the cached informations are cleared
- aFile.close();
+ file.close();
- String name = aFile.getName().getPathDecoded();
- FileContent content = aFile.getContent();
+ String name = file.getName().getURI();
+ FileContent content = file.getContent();
content.close();
InputStream stream = content.getInputStream();
if (stream == null) {
- throw new IOException("No input available for file " + aFile.getName().getPathDecoded());
+ throw new IOException("No input available for file!");
}
InOnly exchange = getExchangeFactory().createInOnlyExchange();
@@ -330,7 +349,7 @@
// sending the file itself along as a message property and holding on to
// the stream we opened
- exchange.getInMessage().setProperty(VFSComponent.VFS_PROPERTY, aFile);
+ exchange.getInMessage().setProperty(VFSComponent.VFS_PROPERTY, file);
this.openExchanges.put(exchange.getExchangeId(), stream);
send(exchange);
@@ -351,6 +370,23 @@
}
/**
+ * Bean defining the class implementing the file locking strategy. This bean
+ * must be an implementation of the
+ * <code>org.apache.servicemix.locks.LockManager</code> interface. By
+ * default, this will be set to an instances of
+ * <code>org.apache.servicemix.common.locks.impl.SimpleLockManager</code>.
+ *
+ * @param lockManager the <code>LockManager</code> implementation to use
+ */
+ public void setLockManager(LockManager lockManager) {
+ this.lockManager = lockManager;
+ }
+
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
+ /**
* Specifies a <code>FileMarshaler</code> object that will marshal file data
* into the NMR. The default file marshaller can read valid XML data.
* <code>FileMarshaler</code> objects are implementations of
@@ -430,76 +466,26 @@
}
/**
- * Specifies the period between polling cycles in millis.
- *
- * @param period The period to set as <code>long</code> value containing millis.
+ * The set of FTPFiles that this component is currently working on
+ *
+ * @return a set of in-process file objects
*/
- public void setPeriod(long period) {
- this.period = period;
- }
-
- public long getPeriod() {
- return this.period;
+ public Set<FileObject> getWorkingSet() {
+ return workingSet;
}
- /**
- * Specifies if sub-directories are polled; if false then the poller will
- * only poll the specified directory. If the endpoint is configured to poll
- * for a specific file rather than a directory then this attribute is
- * ignored. Default is <code>true</code>.
- *
- * @param recursive a baolean specifying if sub-directories should be polled
+ /**
+ * @return Returns the recursive.
*/
- public void setRecursive(boolean recursive) {
- this.recursive = recursive;
- }
-
public boolean isRecursive() {
return this.recursive;
}
-
+
/**
- * a scheduler which monitors transmission state of files and calls processing
- * if the file is fully available
- *
- * @author lhein
+ * @param recursive The recursive to set.
*/
- class FileScheduler extends TimerTask {
- private ConcurrentMap<String, FileObject> scheduledFiles = new ConcurrentHashMap<String, FileObject>();
-
- /* (non-Javadoc)
- * @see java.util.TimerTask#run()
- */
- @Override
- public void run() {
- List<FileObject> tempList = new LinkedList<FileObject>();
- tempList.addAll(scheduledFiles.values());
-
- // working on a temp list
- for (FileObject f : tempList) {
- if (isFileFullyAvailable(f)) {
- try {
- // process file
- processFile(f);
- // remove from scheduled files
- scheduledFiles.remove(f.getName().getURI());
- } catch (Exception ex) {
- logger.error("Error processing file " + f.getName().getPath(), ex);
- }
- } else {
- logger.debug("Skipping file " + f.getName().getPath() + " because it is not fully available yet...");
- }
- }
- }
-
- /**
- * schedules a file for processing
- *
- * @param aFile the file to process
- * @return true if it will be processed
- */
- public boolean scheduleForProcessing(FileObject aFile) {
- return scheduledFiles.putIfAbsent(aFile.getName().getURI(), aFile) == null;
- }
- }
+ public void setRecursive(boolean recursive) {
+ this.recursive = recursive;
+ }
+
}