You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2009/08/08 20:51:04 UTC

svn commit: r802437 - in /synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs: VFSTransportListener.java VFSTransportSender.java VFSUtils.java

Author: ruwan
Date: Sat Aug  8 18:51:03 2009
New Revision: 802437

URL: http://svn.apache.org/viewvc?rev=802437&view=rev
Log:
VFS-locking implementation (need to make it configurable)

Modified:
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sat Aug  8 18:51:03 2009
@@ -194,17 +194,26 @@
                 if (children == null || children.length == 0) {
 
                     if (fileObject.getType() == FileType.FILE) {
-                        try {
-                            processFile(entry, fileObject);
-                            entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
-                            metrics.incrementMessagesReceived();
-                            
-                        } catch (AxisFault e) {
-                            entry.setLastPollState(PollTableEntry.FAILED);
-                            metrics.incrementFaultsReceiving();
-                        }
+                        if (VFSUtils.acquireLock(fsManager, fileObject)) {
+                            try {
+                                processFile(entry, fileObject);
+                                entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
+                                metrics.incrementMessagesReceived();
 
-                        moveOrDeleteAfterProcessing(entry, fileObject);
+                            } catch (AxisFault e) {
+                                VFSUtils.releaseLock(fsManager, fileObject);
+                                logException("Error processing File URI : "
+                                        + fileObject.getName(), e);
+                                entry.setLastPollState(PollTableEntry.FAILED);
+                                metrics.incrementFaultsReceiving();
+                            }
+
+                            moveOrDeleteAfterProcessing(entry, fileObject);
+                            VFSUtils.releaseLock(fsManager, fileObject);
+                        } else {
+                            log.debug("Couldn't get the lock for processing the file : "
+                                    + fileObject.getName());
+                        }
                     }
 
                 } else {
@@ -212,14 +221,15 @@
                     int successCount = 0;
 
                     if (log.isDebugEnabled()) {
-                        log.debug("File name pattern :" + entry.getFileNamePattern());
+                        log.debug("File name pattern : " + entry.getFileNamePattern());
                     }
                     for (FileObject child : children) {
                         if (log.isDebugEnabled()) {
-                            log.debug("Matching file :" + child.getName().getBaseName());
+                            log.debug("Matching file : " + child.getName().getBaseName());
                         }
-                        if ((entry.getFileNamePattern() != null)
-                                && (child.getName().getBaseName().matches(entry.getFileNamePattern()))) {
+                        if ((entry.getFileNamePattern() != null) && (
+                                child.getName().getBaseName().matches(entry.getFileNamePattern()))
+                                && VFSUtils.acquireLock(fsManager, child)) {
                             try {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Processing file :" + child);
@@ -231,6 +241,7 @@
                                 metrics.incrementMessagesReceived();
 
                             } catch (Exception e) {
+                                VFSUtils.releaseLock(fsManager, child);
                                 logException("Error processing File URI : " + child.getName(), e);
                                 failCount++;
                                 // tell moveOrDeleteAfterProcessing() file failed
@@ -239,7 +250,12 @@
                             }
 
                             moveOrDeleteAfterProcessing(entry, child);
+                            VFSUtils.releaseLock(fsManager, child);
+                        } else {
+                            log.debug("Couldn't get the lock for processing the file : "
+                                    + child.getName());
                         }
+
                     }
 
                     if (failCount == 0 && successCount > 0) {
@@ -265,7 +281,6 @@
         } catch (FileSystemException e) {
             processFailure("Error checking for existence and readability : " + fileURI, e, entry);
         }
-
     }
 
     /**

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java Sat Aug  8 18:51:03 2009
@@ -135,21 +135,29 @@
                         // we need to write a file containing the message to this folder
                         FileObject responseFile = fsManager.resolveFile(replyFile,
                             VFSUtils.getFileName(msgCtx, vfsOutInfo));
+
+                        acquireLockForSending(responseFile, vfsOutInfo);
                         if (!responseFile.exists()) {
                             responseFile.createFile();
                         }
                         populateResponseFile(responseFile, msgCtx, append);
+                        VFSUtils.releaseLock(fsManager, responseFile);
 
                     } else if (replyFile.getType() == FileType.FILE) {
+                        
+                        acquireLockForSending(replyFile, vfsOutInfo);
                         populateResponseFile(replyFile, msgCtx, append);
+                        VFSUtils.releaseLock(fsManager, replyFile);
                         
                     } else {
                         handleException("Unsupported reply file type : " + replyFile.getType() +
                             " for file : " + vfsOutInfo.getOutFileURI());
                     }
                 } else {
+                    acquireLockForSending(replyFile, vfsOutInfo);
                     replyFile.createFile();
                     populateResponseFile(replyFile, msgCtx, append);
+                    VFSUtils.releaseLock(fsManager, replyFile);
                 }
             } catch (FileSystemException e) {
                 handleException("Error resolving reply file : " +
@@ -180,11 +188,33 @@
             metrics.incrementMessagesSent(msgContext);
             metrics.incrementBytesSent(msgContext, os.getByteCount());
         } catch (FileSystemException e) {
+            VFSUtils.releaseLock(fsManager, responseFile);
             metrics.incrementFaultsSending();
             handleException("IO Error while creating response file : " + responseFile.getName(), e);
         } catch (IOException e) {
+            VFSUtils.releaseLock(fsManager, responseFile);
             metrics.incrementFaultsSending();
             handleException("IO Error while creating response file : " + responseFile.getName(), e);
         }
     }
+
+    private void acquireLockForSending(FileObject responseFile, VFSOutTransportInfo vfsOutInfo)
+            throws AxisFault {
+        int tryNum = 0;
+        // wait till we get the lock
+        while (!VFSUtils.acquireLock(fsManager, responseFile)) {
+            if (vfsOutInfo.getMaxRetryCount() == tryNum++) {
+                handleException("Couldn't send the message to file : "
+                        + responseFile.getName() + ", unable to acquire the " +
+                        "lock even after " + tryNum + " retries");
+            } else {
+                log.warn("Couldn't get the lock for the file : "
+                        + responseFile.getName() + ", retry : " + tryNum
+                        + " scheduled after : " + vfsOutInfo.getReconnectTimeout());
+                try {
+                    Thread.sleep(vfsOutInfo.getReconnectTimeout());
+                } catch (InterruptedException ignore) {}
+            }
+        }
+    }
 }

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java Sat Aug  8 18:51:03 2009
@@ -25,8 +25,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.vfs.FileContent;
 import org.apache.commons.vfs.FileSystemException;
+import org.apache.commons.vfs.FileObject;
+import org.apache.commons.vfs.FileSystemManager;
 
 import java.util.Map;
+import java.util.Random;
+import java.util.Arrays;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 
 public class VFSUtils extends BaseUtils {
 
@@ -92,5 +99,104 @@
       }
 
       return VFSConstants.DEFAULT_RECONNECT_TIMEOUT; 
-    }   
+    }
+
+    /**
+     * Acquires a file item lock before processing the item, guaranteing that the file is not
+     * processed while it is being uploaded and/or the item is not processed by two listeners
+     *
+     * @param fsManager used to resolve the processing file
+     * @param fo representing the processign file item
+     * @return boolean true if the lock has been acquired or false if not
+     */
+    public static boolean acquireLock(FileSystemManager fsManager, FileObject fo) {
+        
+        // generate a random lock value to ensure that there are no two parties
+        // processing the same file
+        Random random = new Random();
+        byte[] lockValue = String.valueOf(random.nextLong()).getBytes();
+        
+        try {
+            // check whether there is an existing lock for this item, if so it is assumed
+            // to be processed by an another listener (downloading) or a sender (uploading)
+            // lock file is derived by attaching the ".lock" second extension to the file name
+            FileObject lockObject = fsManager.resolveFile(fo.getURL().toString() + ".lock");
+            if (lockObject.exists()) {
+                log.debug("There seems to be an external lock, aborting the processing of the file "
+                        + fo.getName() + ". This could possibly be due to some other party already "
+                        + "processing this file or the file is still being uploaded");
+            } else {
+
+                // write a lock file before starting of the processing, to ensure that the
+                // item is not processed by any other parties
+                lockObject.createFile();
+                OutputStream stream = lockObject.getContent().getOutputStream();
+                try {
+                    stream.write(lockValue);
+                    stream.flush();
+                    stream.close();
+                } catch (IOException e) {
+                    lockObject.delete();
+                    log.debug("Couldn't create the lock file before processing the file "
+                            + fo.getName(), e);
+                    return false;
+                } finally {
+                    lockObject.close();
+                }
+
+                // check whether the lock is in place and is it me who holds the lock. This is
+                // required because it is possible to write the lock file symultaniously by
+                // two processing parties. It checks whether the lock file content is the same
+                // as the written random lock value.
+                // NOTE: this may not be optimal but is sub optimal
+                FileObject verifyingLockObject = fsManager.resolveFile(
+                        fo.getURL().toString() + ".lock");
+                if (verifyingLockObject.exists() && verifyLock(lockValue, verifyingLockObject)) {
+                    return true;
+                }
+            }
+        } catch (FileSystemException fse) {
+            log.debug("Cannot get the lock for the file : " + fo.getName() + " before processing");
+        }
+        return false;
+    }
+
+    /**
+     * Release a file item lock acquired either by the VFS listener or a sender
+     *
+     * @param fsManager which is used to resolve the processed file
+     * @param fo representing the processed file
+     */
+    public static void releaseLock(FileSystemManager fsManager, FileObject fo) {
+        try {
+            FileObject lockObject = fsManager.resolveFile(fo.getURL().toString() + ".lock");
+            if (lockObject.exists()) {
+                lockObject.delete();
+            }
+        } catch (FileSystemException e) {
+            log.error("Couldn't release the lock for the file : "
+                    + fo.getName() + " after processing");
+        }
+    }
+
+    private static boolean verifyLock(byte[] lockValue, FileObject lockObject) {
+        try {
+            InputStream is = lockObject.getContent().getInputStream();
+            byte[] val = new byte[lockValue.length];
+            // noinspection ResultOfMethodCallIgnored
+            is.read(val);
+            if (Arrays.equals(lockValue, val) && is.read() == -1) {
+               return true;
+            } else {
+                log.debug("The lock has been acquired by an another party");
+            }
+        } catch (FileSystemException e) {
+            log.debug("Couldn't verify the lock", e);
+            return false;
+        } catch (IOException e) {
+            log.debug("Couldn't verify the lock", e);
+            return false;
+        }
+        return false;
+    }
 }