You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ra...@apache.org on 2011/05/30 14:53:27 UTC

svn commit: r1129146 - in /synapse/trunk/java/modules/transports/core/vfs: ./ src/main/java/org/apache/synapse/transport/vfs/

Author: rajikak
Date: Mon May 30 12:53:27 2011
New Revision: 1129146

URL: http://svn.apache.org/viewvc?rev=1129146&view=rev
Log:
Fixed SYNAPSE-771.

Modified:
    synapse/trunk/java/modules/transports/core/vfs/pom.xml
    synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
    synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
    synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
    synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java

Modified: synapse/trunk/java/modules/transports/core/vfs/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/pom.xml?rev=1129146&r1=1129145&r2=1129146&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/pom.xml (original)
+++ synapse/trunk/java/modules/transports/core/vfs/pom.xml Mon May 30 12:53:27 2011
@@ -53,6 +53,7 @@
                             !org.apache.commons.io.output,
                             org.apache.commons.io.output; version=0.0.0,
                             org.apache.commons.vfs.*; version=0.0.0,
+							org.apache.commons.io; version=0.0.0,
                             *; resolution:=optional
                         </Import-Package>
                     </instructions>

Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=1129146&r1=1129145&r2=1129146&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java Mon May 30 12:53:27 2011
@@ -69,11 +69,41 @@ public class PollTableEntry extends Abst
     private long reconnectTimeout;
     private boolean fileLocking;
 
+    private String moveAfterMoveFailure;
+
+    private int nextRetryDurationForFailedMove;
+
+    private String failedRecordFileName;
+
+    private String failedRecordFileDestination;
+
+    private String failedRecordTimestampFormat;
+
     public PollTableEntry(boolean fileLocking) {
         this.fileLocking = fileLocking;
     }
 
-    @Override
+      public String getMoveAfterMoveFailure() {
+            return moveAfterMoveFailure;
+      }
+
+      public int getNextRetryDuration() {
+            return nextRetryDurationForFailedMove;
+      }
+
+      public String getFailedRecordFileName() {
+            return failedRecordFileName;
+      }
+
+      public String getFailedRecordFileDestination() {
+            return failedRecordFileDestination;
+      }
+
+      public String getFailedRecordTimestampFormat() {
+            return failedRecordTimestampFormat;
+      }
+
+      @Override
     public EndpointReference[] getEndpointReferences(AxisService service, String ip) {
         return new EndpointReference[] { new EndpointReference("vfs:" + fileURI) };
     }
@@ -244,6 +274,34 @@ public class PollTableEntry extends Abst
             } else if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strFileLocking)) {
                 fileLocking = false;
             }
+
+            moveAfterMoveFailure = ParamUtils.getOptionalParam(params,
+                    VFSConstants.TRANSPORT_FILE_MOVE_AFTER_FAILED_MOVE);
+
+            String nextRetryDuration = ParamUtils.getOptionalParam(
+                    params, VFSConstants.TRANSPORT_FAILED_RECORD_NEXT_RETRY_DURATION);
+            nextRetryDurationForFailedMove = nextRetryDuration != null ? Integer.parseInt(nextRetryDuration) :
+                    VFSConstants.DEFAULT_NEXT_RETRY_DURATION;
+
+            failedRecordFileName = ParamUtils.getOptionalParam(params,
+                    VFSConstants.TRANSPORT_FAILED_RECORDS_FILE_NAME);
+            if (failedRecordFileName == null) {
+                failedRecordFileName = VFSConstants.DEFAULT_FAILED_RECORDS_FILE_NAME;
+            }
+
+            failedRecordFileDestination = ParamUtils.getOptionalParam(params,
+                    VFSConstants.TRANSPORT_FAILED_RECORDS_FILE_DESTINATION);
+
+            if (failedRecordFileDestination == null) {
+                failedRecordFileDestination = VFSConstants.DEFAULT_FAILED_RECORDS_FILE_DESTINATION;
+            }
+
+            failedRecordTimestampFormat = ParamUtils.getOptionalParam(params,
+                    VFSConstants.TRANSPORT_FAILED_RECORD_TIMESTAMP_FORMAT);
+            if (failedRecordTimestampFormat == null) {
+                failedRecordTimestampFormat =
+                        VFSConstants.DEFAULT_TRANSPORT_FAILED_RECORD_TIMESTAMP_FORMAT;
+            }
             
             return super.loadConfiguration(params);
         }

Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=1129146&r1=1129145&r2=1129146&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Mon May 30 12:53:27 2011
@@ -66,4 +66,52 @@ public final class VFSConstants {
     public static final String FILE_NAME = "FILE_NAME";
     public static final String FILE_LENGTH = "FILE_LENGTH";
     public static final String LAST_MODIFIED = "LAST_MODIFIED";
+
+      /**
+       * If a rename/move operation failed, we'll keep track of a list in a text file and this will
+       * provide the ability to configure that property file name
+       */
+      public static final String TRANSPORT_FAILED_RECORDS_FILE_NAME =
+              "transport.vfs.FailedRecordsFileName";
+
+      /**
+       * The default name of the failed file
+       */
+      public static final String DEFAULT_FAILED_RECORDS_FILE_NAME =
+              "vfs-move-failed-records.properties";
+
+      /**
+       * The location to place the property file that contains the failed files
+       */
+      public static final String TRANSPORT_FAILED_RECORDS_FILE_DESTINATION =
+              "transport.vfs.FailedRecordsFileDestination";
+
+      /**
+       * The default location to place the record file
+       */
+      public static final String DEFAULT_FAILED_RECORDS_FILE_DESTINATION = "repository/conf/";
+
+      /**
+       * Once a record/file is marked as failed we'll try again to move the file after this amount
+       * of milliseconds
+       */
+      public static final String TRANSPORT_FAILED_RECORD_NEXT_RETRY_DURATION =
+              "transport.vfs.FailedRecordNextRetryDuration";
+
+      public static final int DEFAULT_NEXT_RETRY_DURATION = 3000; // 3 seconds
+
+      /**
+       * This specify the folder to which a move failed file should be moved to ( in next successful)
+       * attempt
+       */
+      public static final String TRANSPORT_FILE_MOVE_AFTER_FAILED_MOVE =
+              "transport.vfs.MoveAfterFailedMove";
+
+      public static final String TRANSPORT_FAILED_RECORD_TIMESTAMP_FORMAT =
+              "transport.vfs.MoveFailedRecordTimestampFormat";
+
+      public static final String DEFAULT_TRANSPORT_FAILED_RECORD_TIMESTAMP_FORMAT =
+              "dd/MM/yyyy/ HH:mm:ss";
+
+      public static final String FAILED_RECORD_DELIMITER = " ";
 }

Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=1129146&r1=1129145&r2=1129146&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Mon May 30 12:53:27 2011
@@ -34,16 +34,17 @@ import org.apache.axis2.transport.base.A
 import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.ManagementSupport;
+import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.vfs.*;
 import org.apache.commons.vfs.impl.StandardFileSystemManager;
 
 import javax.mail.internet.ContentType;
 import javax.mail.internet.ParseException;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 /**
  * The "vfs" transport is a polling based transport - i.e. it gets kicked off at
@@ -118,6 +119,14 @@ public class VFSTransportListener extend
      */
     private boolean globalFileLockingFlag = true;
 
+    private WorkerPool workerPool = null;
+
+    private static final int STATE_STOPPED = 0;
+
+    private static final int STATE_RUNNING = 1;
+
+    private volatile int removeTaskState = STATE_STOPPED;
+
     @Override
     protected void doInit() throws AxisFault {
         super.doInit();
@@ -125,6 +134,7 @@ public class VFSTransportListener extend
             StandardFileSystemManager fsm = new StandardFileSystemManager();
             fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
             fsm.init();
+            this.workerPool = super.workerPool;
             fsManager = fsm;
             Parameter lockFlagParam = getTransportInDescription().getParameter(VFSConstants.TRANSPORT_FILE_LOCKING);
             if (lockFlagParam != null) {
@@ -204,8 +214,12 @@ public class VFSTransportListener extend
 
                 // if this is a file that would translate to a single message
                 if (children == null || children.length == 0) {
+                    boolean isFailedRecord = false;
+                    if (entry.getMoveAfterMoveFailure() != null) {
+                        isFailedRecord = isFailedRecord(fileObject, entry);
+                    }
 
-                    if (fileObject.getType() == FileType.FILE) {
+                    if (fileObject.getType() == FileType.FILE && !isFailedRecord) {
                         if (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled() &&
                                 VFSUtils.acquireLock(fsManager, fileObject))) {
                             try {
@@ -214,22 +228,46 @@ public class VFSTransportListener extend
                                 metrics.incrementMessagesReceived();
 
                             } catch (AxisFault e) {
-                                if (entry.isFileLockingEnabled()) {
-                                    VFSUtils.releaseLock(fsManager, fileObject);
-                                }
                                 logException("Error processing File URI : "
                                         + fileObject.getName(), e);
                                 entry.setLastPollState(PollTableEntry.FAILED);
                                 metrics.incrementFaultsReceiving();
                             }
 
-                            moveOrDeleteAfterProcessing(entry, fileObject);
+                            try {
+                                  moveOrDeleteAfterProcessing(entry, fileObject);
+                            } catch (AxisFault axisFault) {
+
+                                  logException("File object '" + fileObject.getURL().toString() + "' " +
+                                            "cloud not be moved", axisFault);
+                                  entry.setLastPollState(PollTableEntry.FAILED);
+                                  String timeStamp =
+                                            VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
+                                  addFailedRecord(entry, fileObject, timeStamp);
+                            }
                             if (entry.isFileLockingEnabled()) {
-                                VFSUtils.releaseLock(fsManager, fileObject);
+                                   VFSUtils.releaseLock(fsManager, fileObject);
+                                    if (log.isDebugEnabled()) {
+                                    log.debug("Removed the lock file '" + fileObject.toString() +
+                                            ".lock' of the file '" + fileObject.toString());
+                              }
                             }
                         } else if (log.isDebugEnabled()) {
                             log.debug("Couldn't get the lock for processing the file : "
                                     + fileObject.getName());
+                        } else if (isFailedRecord) {
+                            if (entry.isFileLockingEnabled()) {
+                                VFSUtils.releaseLock(fsManager, fileObject);
+                            }
+                            // schedule a cleanup task if the file is there
+                            if (fsManager.resolveFile(fileObject.getURL().toString()) != null &&
+                                    removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
+                                workerPool.execute(new FileRemoveTask(entry, fileObject));
+                            }
+                            if (log.isDebugEnabled()) {
+                                log.debug("File '" + fileObject.getURL() + "' has been marked as a failed" +
+                                        " record, it will not process");
+                            }
                         }
                     }
 
@@ -241,13 +279,18 @@ public class VFSTransportListener extend
                         log.debug("File name pattern : " + entry.getFileNamePattern());
                     }
                     for (FileObject child : children) {
+                        boolean isFailedRecord = false;
+                        if (entry.getMoveAfterMoveFailure() != null) {
+                            isFailedRecord = isFailedRecord(child, entry);
+                        }
                         if (log.isDebugEnabled()) {
                             log.debug("Matching file : " + child.getName().getBaseName());
                         }
                         if ((entry.getFileNamePattern() != null) && (
                                 child.getName().getBaseName().matches(entry.getFileNamePattern()))
                                 && (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled()
-                                && VFSUtils.acquireLock(fsManager, child)))) {
+                                && VFSUtils.acquireLock(fsManager, child))) &&
+                                !isFailedRecord) {
                             try {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Processing file :" + child);
@@ -259,9 +302,6 @@ public class VFSTransportListener extend
                                 metrics.incrementMessagesReceived();
 
                             } catch (Exception e) {
-                                if (entry.isFileLockingEnabled()) {
-                                    VFSUtils.releaseLock(fsManager, child);
-                                }
                                 logException("Error processing File URI : " + child.getName(), e);
                                 failCount++;
                                 // tell moveOrDeleteAfterProcessing() file failed
@@ -269,13 +309,39 @@ public class VFSTransportListener extend
                                 metrics.incrementFaultsReceiving();
                             }
 
-                            moveOrDeleteAfterProcessing(entry, child);
-                            if (entry.isFileLockingEnabled()) {
+                              try {
+                                    moveOrDeleteAfterProcessing(entry, child);
+                              } catch (AxisFault axisFault) {
+                                    logException("File object '" + child.getURL().toString() +
+                                            "'cloud not be moved", axisFault);
+                                    failCount++;
+                                    entry.setLastPollState(PollTableEntry.FAILED);
+                                    String timeStamp =
+                                            VFSUtils.getSystemTime(entry.getFailedRecordTimestampFormat());
+                                    addFailedRecord(entry, child, timeStamp);
+                              }
+                              if (entry.isFileLockingEnabled()) {
                                 VFSUtils.releaseLock(fsManager, child);
                             }
-                        } else if (log.isDebugEnabled()) {
+                        } else if (!(!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled()
+                                && VFSUtils.acquireLock(fsManager, fileObject))) &&
+                                log.isDebugEnabled()) {
                             log.debug("Couldn't get the lock for processing the file : "
                                     + child.getName());
+                        } else if(isFailedRecord){
+                              if (entry.isFileLockingEnabled()) {
+                                VFSUtils.releaseLock(fsManager, child);
+                                VFSUtils.releaseLock(fsManager, fileObject);
+                            }
+                            if (fsManager.resolveFile(child.getURL().toString()) != null &&
+                                    removeTaskState == STATE_STOPPED && entry.getMoveAfterMoveFailure() != null) {
+                                workerPool.execute(new FileRemoveTask(entry, child));
+                            }
+                            if (log.isDebugEnabled()) {
+                                log.debug("File '" + fileObject.getURL() +
+                                        "' has been marked as a failed record, it will not " +
+                                        "process");
+                            }
                         }
 
                     }
@@ -310,7 +376,8 @@ public class VFSTransportListener extend
      * @param entry the PollTableEntry for the file that has been processed
      * @param fileObject the FileObject representing the file to be moved or deleted
      */
-    private void moveOrDeleteAfterProcessing(final PollTableEntry entry, FileObject fileObject) {
+    private void moveOrDeleteAfterProcessing(final PollTableEntry entry, FileObject fileObject)
+                        throws AxisFault{
 
         String moveToDirectoryURI = null;
         try {
@@ -347,7 +414,7 @@ public class VFSTransportListener extend
                 try {
                     fileObject.moveTo(dest);
                 } catch (FileSystemException e) {
-                    log.error("Error moving file : " + fileObject + " to " + moveToDirectoryURI, e);
+                    handleException("Error moving file : " + fileObject + " to " + moveToDirectoryURI, e);
                 }
             } else {
                 try {
@@ -356,7 +423,9 @@ public class VFSTransportListener extend
                     }
                     fileObject.close();
                     if (!fileObject.delete()) {
-                        log.error("Cannot delete file : " + fileObject);
+                        String msg = "Cannot delete file : " + fileObject;
+                        log.error(msg);
+                        throw new AxisFault(msg);
                     }
                 } catch (FileSystemException e) {
                     log.error("Error deleting file : " + fileObject, e);
@@ -513,4 +582,137 @@ public class VFSTransportListener extend
     protected PollTableEntry createEndpoint() {
         return new PollTableEntry(globalFileLockingFlag);
     }
+
+    private synchronized void addFailedRecord(PollTableEntry pollTableEntry,
+                                              FileObject failedObject,
+                                              String timeString) {
+        try {
+            String record = failedObject.getName().getBaseName() + VFSConstants.FAILED_RECORD_DELIMITER
+                    + timeString;
+            String recordFile = pollTableEntry.getFailedRecordFileDestination() +
+                    pollTableEntry.getFailedRecordFileName();
+            File failedRecordFile = new File(recordFile);
+            if (!failedRecordFile.exists()) {
+                FileUtils.writeStringToFile(failedRecordFile, record);
+                if (log.isDebugEnabled()) {
+                    log.debug("Added fail record '" + record + "' into the record file '"
+                            + recordFile + "'");
+                }
+            } else {
+                List<String> content = FileUtils.readLines(failedRecordFile);
+                if (!content.contains(record)) {
+                    content.add(record);
+                }
+                FileUtils.writeLines(failedRecordFile, content);
+            }
+        } catch (IOException e) {
+            log.fatal("Failure while writing the failed records!", e);
+        }
+    }
+
+    private boolean isFailedRecord(FileObject fileObject, PollTableEntry entry) {
+        String failedFile = entry.getFailedRecordFileDestination() +
+                entry.getFailedRecordFileName();
+        File file = new File(failedFile);
+        if (file.exists()) {
+            try {
+                List list = FileUtils.readLines(file);
+                for (Object aList : list) {
+                    String str = (String) aList;
+                    StringTokenizer st = new StringTokenizer(str,
+                            VFSConstants.FAILED_RECORD_DELIMITER);
+                    String fileName = st.nextToken();
+                    if (fileName != null &&
+                            fileName.equals(fileObject.getName().getBaseName())) {
+                        return true;
+                    }
+                }
+            } catch (IOException e) {
+                log.fatal("Error while reading the file '" + failedFile + "'", e);
+            }
+        }
+        return false;
+    }
+
+    private class FileRemoveTask implements Runnable {
+        private FileObject failedFileObject;
+        private PollTableEntry pollTableEntry;
+
+        public FileRemoveTask(PollTableEntry pollTableEntry, FileObject fileObject) {
+            this.pollTableEntry = pollTableEntry;
+            this.failedFileObject = fileObject;
+        }
+
+        public void run() {
+            if (log.isDebugEnabled()) {
+                log.debug("New file remove task is starting..thread id : " +
+                        Thread.currentThread().getName());
+            }
+            // there has been a failure, basically it should be a move operation
+            // failure. we'll re-try to move in a loop suspending with the
+            // configured amount of time
+            // we'll check if the lock is still there and if the lock is there
+            // then we assume that the respective file object is also there
+            // try to remove the folder waiting on a busy loop, if the remove operation success
+            // we just exit from the busy loop and mark end of the file move task.
+            boolean isDeletionSucceed = false;
+            int nextRetryDuration = pollTableEntry.getNextRetryDuration();
+            int count = 0;
+            while (!isDeletionSucceed) {
+                try {
+                    reTryFailedMove(pollTableEntry, failedFileObject);
+                    isDeletionSucceed = true;
+                    removeTaskState = STATE_STOPPED;
+                } catch (AxisFault axisFault) {
+                    removeTaskState = STATE_RUNNING;
+                    try {
+                        log.error("Remove attempt '" + (count++) + "' failed for the file '" +
+                                failedFileObject.getURL().toString() + "', next re-try will be " +
+                                "after '" + nextRetryDuration + "' milliseconds");
+                    } catch (FileSystemException e) {
+                        log.error("Error while retrying the file url of the file object '" +
+                                failedFileObject + "'");
+                    }
+                    try {
+                        Thread.sleep(nextRetryDuration);
+                    } catch (InterruptedException ignore) {
+                        // ignore
+                    }
+                }
+            }
+        }
+
+        private synchronized void reTryFailedMove(PollTableEntry entry, FileObject fileObject)
+                throws AxisFault {
+            try {
+
+                String moveToDirectoryURI = entry.getMoveAfterMoveFailure();
+                FileObject moveToDirectory = fsManager.resolveFile(moveToDirectoryURI);
+                if (!moveToDirectory.exists()) {
+                    moveToDirectory.createFolder();
+                }
+                String prefix;
+                if (entry.getMoveTimestampFormat() != null) {
+                    prefix = entry.getMoveTimestampFormat().format(new Date());
+                } else {
+                    prefix = "";
+                }
+                FileObject dest = moveToDirectory.resolveFile(
+                        prefix + fileObject.getName().getBaseName());
+                if (log.isDebugEnabled()) {
+                    log.debug("The failed file is moving to :" + dest.getName().getURI());
+                }
+                try {
+                    fileObject.moveTo(dest);
+                } catch (FileSystemException e) {
+                    handleException("Error moving the failed file : " + fileObject + " to " +
+                            moveToDirectoryURI, e);
+                }
+            } catch (FileSystemException e) {
+                handleException("Cloud not move the failed file object '" + fileObject + "'", e);
+            } catch (IOException e) {
+                handleException("Cloud not create the folder", e);
+            }
+        }
+    }
 }

Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java?rev=1129146&r1=1129145&r2=1129146&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java Mon May 30 12:53:27 2011
@@ -31,7 +31,9 @@ import org.apache.commons.vfs.FileSystem
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.Map;
 import java.util.Random;
 import java.util.regex.Pattern;
@@ -105,7 +107,7 @@ public class VFSUtils extends BaseUtils 
      * @param fo representing the processign file item
      * @return boolean true if the lock has been acquired or false if not
      */
-    public static boolean acquireLock(FileSystemManager fsManager, FileObject fo) {
+    public synchronized static boolean acquireLock(FileSystemManager fsManager, FileObject fo) {
         
         // generate a random lock value to ensure that there are no two parties
         // processing the same file
@@ -138,7 +140,7 @@ public class VFSUtils extends BaseUtils 
                     stream.close();
                 } catch (IOException e) {
                     lockObject.delete();
-                    log.debug("Couldn't create the lock file before processing the file "
+                    log.error("Couldn't create the lock file before processing the file "
                             + fullPath, e);
                     return false;
                 } finally {
@@ -146,7 +148,7 @@ public class VFSUtils extends BaseUtils 
                 }
 
                 // check whether the lock is in place and is it me who holds the lock. This is
-                // required because it is possible to write the lock file symultaniously by
+                // required because it is possible to write the lock file simultaneously by
                 // two processing parties. It checks whether the lock file content is the same
                 // as the written random lock value.
                 // NOTE: this may not be optimal but is sub optimal
@@ -157,12 +159,16 @@ public class VFSUtils extends BaseUtils 
                 }
             }
         } catch (FileSystemException fse) {
-            log.debug("Cannot get the lock for the file : " + maskURLPassword(fo.getName().getURI())
+            log.error("Cannot get the lock for the file : " + maskURLPassword(fo.getName().getURI())
                     + " before processing");
         }
         return false;
     }
 
+      public static String getSystemTime(String dateFormat) {
+            return new SimpleDateFormat(dateFormat).format(new Date());
+      }
+
     /**
      * Release a file item lock acquired either by the VFS listener or a sender
      *
@@ -210,10 +216,10 @@ public class VFSUtils extends BaseUtils 
                 log.debug("The lock has been acquired by an another party");
             }
         } catch (FileSystemException e) {
-            log.debug("Couldn't verify the lock", e);
+            log.error("Couldn't verify the lock", e);
             return false;
         } catch (IOException e) {
-            log.debug("Couldn't verify the lock", e);
+            log.error("Couldn't verify the lock", e);
             return false;
         }
         return false;