You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2009/08/08 20:51:04 UTC
svn commit: r802437 - in
/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs:
VFSTransportListener.java VFSTransportSender.java VFSUtils.java
Author: ruwan
Date: Sat Aug 8 18:51:03 2009
New Revision: 802437
URL: http://svn.apache.org/viewvc?rev=802437&view=rev
Log:
VFS-locking implementation (need to make it configurable)
Modified:
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sat Aug 8 18:51:03 2009
@@ -194,17 +194,26 @@
if (children == null || children.length == 0) {
if (fileObject.getType() == FileType.FILE) {
- try {
- processFile(entry, fileObject);
- entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
- metrics.incrementMessagesReceived();
-
- } catch (AxisFault e) {
- entry.setLastPollState(PollTableEntry.FAILED);
- metrics.incrementFaultsReceiving();
- }
+ if (VFSUtils.acquireLock(fsManager, fileObject)) {
+ try {
+ processFile(entry, fileObject);
+ entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
+ metrics.incrementMessagesReceived();
- moveOrDeleteAfterProcessing(entry, fileObject);
+ } catch (AxisFault e) {
+ VFSUtils.releaseLock(fsManager, fileObject);
+ logException("Error processing File URI : "
+ + fileObject.getName(), e);
+ entry.setLastPollState(PollTableEntry.FAILED);
+ metrics.incrementFaultsReceiving();
+ }
+
+ moveOrDeleteAfterProcessing(entry, fileObject);
+ VFSUtils.releaseLock(fsManager, fileObject);
+ } else {
+ log.debug("Couldn't get the lock for processing the file : "
+ + fileObject.getName());
+ }
}
} else {
@@ -212,14 +221,15 @@
int successCount = 0;
if (log.isDebugEnabled()) {
- log.debug("File name pattern :" + entry.getFileNamePattern());
+ log.debug("File name pattern : " + entry.getFileNamePattern());
}
for (FileObject child : children) {
if (log.isDebugEnabled()) {
- log.debug("Matching file :" + child.getName().getBaseName());
+ log.debug("Matching file : " + child.getName().getBaseName());
}
- if ((entry.getFileNamePattern() != null)
- && (child.getName().getBaseName().matches(entry.getFileNamePattern()))) {
+ if ((entry.getFileNamePattern() != null) && (
+ child.getName().getBaseName().matches(entry.getFileNamePattern()))
+ && VFSUtils.acquireLock(fsManager, child)) {
try {
if (log.isDebugEnabled()) {
log.debug("Processing file :" + child);
@@ -231,6 +241,7 @@
metrics.incrementMessagesReceived();
} catch (Exception e) {
+ VFSUtils.releaseLock(fsManager, child);
logException("Error processing File URI : " + child.getName(), e);
failCount++;
// tell moveOrDeleteAfterProcessing() file failed
@@ -239,7 +250,12 @@
}
moveOrDeleteAfterProcessing(entry, child);
+ VFSUtils.releaseLock(fsManager, child);
+ } else {
+ log.debug("Couldn't get the lock for processing the file : "
+ + child.getName());
}
+
}
if (failCount == 0 && successCount > 0) {
@@ -265,7 +281,6 @@
} catch (FileSystemException e) {
processFailure("Error checking for existence and readability : " + fileURI, e, entry);
}
-
}
/**
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java Sat Aug 8 18:51:03 2009
@@ -135,21 +135,29 @@
// we need to write a file containing the message to this folder
FileObject responseFile = fsManager.resolveFile(replyFile,
VFSUtils.getFileName(msgCtx, vfsOutInfo));
+
+ acquireLockForSending(responseFile, vfsOutInfo);
if (!responseFile.exists()) {
responseFile.createFile();
}
populateResponseFile(responseFile, msgCtx, append);
+ VFSUtils.releaseLock(fsManager, responseFile);
} else if (replyFile.getType() == FileType.FILE) {
+
+ acquireLockForSending(replyFile, vfsOutInfo);
populateResponseFile(replyFile, msgCtx, append);
+ VFSUtils.releaseLock(fsManager, replyFile);
} else {
handleException("Unsupported reply file type : " + replyFile.getType() +
" for file : " + vfsOutInfo.getOutFileURI());
}
} else {
+ acquireLockForSending(replyFile, vfsOutInfo);
replyFile.createFile();
populateResponseFile(replyFile, msgCtx, append);
+ VFSUtils.releaseLock(fsManager, replyFile);
}
} catch (FileSystemException e) {
handleException("Error resolving reply file : " +
@@ -180,11 +188,33 @@
metrics.incrementMessagesSent(msgContext);
metrics.incrementBytesSent(msgContext, os.getByteCount());
} catch (FileSystemException e) {
+ VFSUtils.releaseLock(fsManager, responseFile);
metrics.incrementFaultsSending();
handleException("IO Error while creating response file : " + responseFile.getName(), e);
} catch (IOException e) {
+ VFSUtils.releaseLock(fsManager, responseFile);
metrics.incrementFaultsSending();
handleException("IO Error while creating response file : " + responseFile.getName(), e);
}
}
+
+ private void acquireLockForSending(FileObject responseFile, VFSOutTransportInfo vfsOutInfo)
+ throws AxisFault {
+ int tryNum = 0;
+ // wait till we get the lock
+ while (!VFSUtils.acquireLock(fsManager, responseFile)) {
+ if (vfsOutInfo.getMaxRetryCount() == tryNum++) {
+ handleException("Couldn't send the message to file : "
+ + responseFile.getName() + ", unable to acquire the " +
+ "lock even after " + tryNum + " retries");
+ } else {
+ log.warn("Couldn't get the lock for the file : "
+ + responseFile.getName() + ", retry : " + tryNum
+ + " scheduled after : " + vfsOutInfo.getReconnectTimeout());
+ try {
+ Thread.sleep(vfsOutInfo.getReconnectTimeout());
+ } catch (InterruptedException ignore) {}
+ }
+ }
+ }
}
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java?rev=802437&r1=802436&r2=802437&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java Sat Aug 8 18:51:03 2009
@@ -25,8 +25,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.vfs.FileContent;
import org.apache.commons.vfs.FileSystemException;
+import org.apache.commons.vfs.FileObject;
+import org.apache.commons.vfs.FileSystemManager;
import java.util.Map;
+import java.util.Random;
+import java.util.Arrays;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InputStream;
public class VFSUtils extends BaseUtils {
@@ -92,5 +99,104 @@
}
return VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
- }
+ }
+
+ /**
+ * Acquires a file item lock before processing the item, guaranteing that the file is not
+ * processed while it is being uploaded and/or the item is not processed by two listeners
+ *
+ * @param fsManager used to resolve the processing file
+ * @param fo representing the processign file item
+ * @return boolean true if the lock has been acquired or false if not
+ */
+ public static boolean acquireLock(FileSystemManager fsManager, FileObject fo) {
+
+ // generate a random lock value to ensure that there are no two parties
+ // processing the same file
+ Random random = new Random();
+ byte[] lockValue = String.valueOf(random.nextLong()).getBytes();
+
+ try {
+ // check whether there is an existing lock for this item, if so it is assumed
+ // to be processed by an another listener (downloading) or a sender (uploading)
+ // lock file is derived by attaching the ".lock" second extension to the file name
+ FileObject lockObject = fsManager.resolveFile(fo.getURL().toString() + ".lock");
+ if (lockObject.exists()) {
+ log.debug("There seems to be an external lock, aborting the processing of the file "
+ + fo.getName() + ". This could possibly be due to some other party already "
+ + "processing this file or the file is still being uploaded");
+ } else {
+
+ // write a lock file before starting of the processing, to ensure that the
+ // item is not processed by any other parties
+ lockObject.createFile();
+ OutputStream stream = lockObject.getContent().getOutputStream();
+ try {
+ stream.write(lockValue);
+ stream.flush();
+ stream.close();
+ } catch (IOException e) {
+ lockObject.delete();
+ log.debug("Couldn't create the lock file before processing the file "
+ + fo.getName(), e);
+ return false;
+ } finally {
+ lockObject.close();
+ }
+
+ // check whether the lock is in place and is it me who holds the lock. This is
+ // required because it is possible to write the lock file symultaniously by
+ // two processing parties. It checks whether the lock file content is the same
+ // as the written random lock value.
+ // NOTE: this may not be optimal but is sub optimal
+ FileObject verifyingLockObject = fsManager.resolveFile(
+ fo.getURL().toString() + ".lock");
+ if (verifyingLockObject.exists() && verifyLock(lockValue, verifyingLockObject)) {
+ return true;
+ }
+ }
+ } catch (FileSystemException fse) {
+ log.debug("Cannot get the lock for the file : " + fo.getName() + " before processing");
+ }
+ return false;
+ }
+
+ /**
+ * Release a file item lock acquired either by the VFS listener or a sender
+ *
+ * @param fsManager which is used to resolve the processed file
+ * @param fo representing the processed file
+ */
+ public static void releaseLock(FileSystemManager fsManager, FileObject fo) {
+ try {
+ FileObject lockObject = fsManager.resolveFile(fo.getURL().toString() + ".lock");
+ if (lockObject.exists()) {
+ lockObject.delete();
+ }
+ } catch (FileSystemException e) {
+ log.error("Couldn't release the lock for the file : "
+ + fo.getName() + " after processing");
+ }
+ }
+
+ private static boolean verifyLock(byte[] lockValue, FileObject lockObject) {
+ try {
+ InputStream is = lockObject.getContent().getInputStream();
+ byte[] val = new byte[lockValue.length];
+ // noinspection ResultOfMethodCallIgnored
+ is.read(val);
+ if (Arrays.equals(lockValue, val) && is.read() == -1) {
+ return true;
+ } else {
+ log.debug("The lock has been acquired by an another party");
+ }
+ } catch (FileSystemException e) {
+ log.debug("Couldn't verify the lock", e);
+ return false;
+ } catch (IOException e) {
+ log.debug("Couldn't verify the lock", e);
+ return false;
+ }
+ return false;
+ }
}