You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2009/08/09 06:12:44 UTC
svn commit: r802480 - in
/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs:
PollTableEntry.java VFSConstants.java VFSOutTransportInfo.java
VFSTransportListener.java VFSTransportSender.java
Author: ruwan
Date: Sun Aug 9 04:12:43 2009
New Revision: 802480
URL: http://svn.apache.org/viewvc?rev=802480&view=rev
Log:
making the VFS transport locking configurable at a global level (either at the sender or receiver level) as well as at a service/endpoint level.
Modified:
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java Sun Aug 9 04:12:43 2009
@@ -66,6 +66,11 @@
private int maxRetryCount;
private long reconnectTimeout;
+ private boolean fileLocking;
+
+ public PollTableEntry(boolean fileLocking) {
+ this.fileLocking = fileLocking;
+ }
@Override
public EndpointReference[] getEndpointReferences(String ip) {
@@ -152,6 +157,10 @@
return maxRetryCount;
}
+ public boolean isFileLockingEnabled() {
+ return fileLocking;
+ }
+
public long getReconnectTimeout() {
return reconnectTimeout;
}
@@ -226,6 +235,14 @@
reconnectTimeout = strReconnectTimeout != null ?
Integer.parseInt(strReconnectTimeout) * 1000 :
VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
+
+ String strFileLocking = ParamUtils.getOptionalParam(
+ params, VFSConstants.TRANSPORT_FILE_LOCKING);
+ if (VFSConstants.TRANSPORT_FILE_LOCKING_ENABLED.equals(strFileLocking)) {
+ fileLocking = true;
+ } else if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strFileLocking)) {
+ fileLocking = false;
+ }
return super.loadConfiguration(params);
}
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Sun Aug 9 04:12:43 2009
@@ -41,6 +41,9 @@
public static final String TRANSPORT_FILE_FILE_URI = "transport.vfs.FileURI";
public static final String TRANSPORT_FILE_FILE_NAME_PATTERN = "transport.vfs.FileNamePattern";
public static final String TRANSPORT_FILE_CONTENT_TYPE = "transport.vfs.ContentType";
+ public static final String TRANSPORT_FILE_LOCKING = "transport.vfs.Locking";
+ public static final String TRANSPORT_FILE_LOCKING_ENABLED = "enable";
+ public static final String TRANSPORT_FILE_LOCKING_DISABLED = "disable";
public static final String REPLY_FILE_URI = "transport.vfs.ReplyFileURI";
public static final String REPLY_FILE_NAME = "transport.vfs.ReplyFileName";
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java Sun Aug 9 04:12:43 2009
@@ -42,6 +42,7 @@
private int maxRetryCount = 3;
private long reconnectTimeout = 30000;
private boolean append;
+ private boolean fileLocking;
/**
* Constructs the VFSOutTransportInfo containing the information about the file to which the
@@ -49,14 +50,14 @@
*
* @param outFileURI URI of the file to which the message is delivered
*/
- VFSOutTransportInfo(String outFileURI) {
+ VFSOutTransportInfo(String outFileURI, boolean fileLocking) {
if (outFileURI.startsWith(VFSConstants.VFS_PREFIX)) {
this.outFileURI = outFileURI.substring(VFSConstants.VFS_PREFIX.length());
} else {
this.outFileURI = outFileURI;
}
-
+
Map<String,String> properties = BaseUtils.getEPRProperties(outFileURI);
if (properties.containsKey(VFSConstants.MAX_RETRY_COUNT)) {
String strMaxRetryCount = properties.get(VFSConstants.MAX_RETRY_COUNT);
@@ -72,6 +73,17 @@
reconnectTimeout = VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
}
+ if (properties.containsKey(VFSConstants.TRANSPORT_FILE_LOCKING)) {
+ String strFileLocking = properties.get(VFSConstants.TRANSPORT_FILE_LOCKING);
+ if (VFSConstants.TRANSPORT_FILE_LOCKING_ENABLED.equals(strFileLocking)) {
+ fileLocking = true;
+ } else if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strFileLocking)) {
+ fileLocking = false;
+ }
+ } else {
+ this.fileLocking = fileLocking;
+ }
+
if (properties.containsKey(VFSConstants.APPEND)) {
String strAppend = properties.get(VFSConstants.APPEND);
append = Boolean.parseBoolean(strAppend);
@@ -82,6 +94,7 @@
log.debug("Using the maxRetryCount : " + maxRetryCount);
log.debug("Using the reconnectionTimeout : " + reconnectTimeout);
log.debug("Using the append : " + append);
+ log.debug("File locking : " + (this.fileLocking ? "ON" : "OFF"));
}
}
@@ -124,4 +137,8 @@
public String getContentType() {
return contentType;
}
+
+ public boolean isFileLockingEnabled() {
+ return fileLocking;
+ }
}
\ No newline at end of file
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sun Aug 9 04:12:43 2009
@@ -27,6 +27,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.format.DataSourceMessageBuilder;
import org.apache.axis2.format.ManagedDataSource;
import org.apache.axis2.format.ManagedDataSourceFactory;
@@ -76,7 +77,9 @@
* smb://somehost/home
*
* axis2.xml - transport definition
- * <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>
+ * <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener">
+ * <parameter name="transport.vfs.Locking">enable|disable</parameter> ?
+ * </transportReceiver>
*
* services.xml - service attachment
* required parameters
@@ -110,6 +113,14 @@
private FileSystemManager fsManager = null;
/**
+ * By default file locking in VFS transport is turned on at a global level
+ *
+ * NOTE: DO NOT USE THIS FLAG, USE PollTableEntry#isFileLockingEnabled() TO CHECK WHETHR
+ * FILE LOCKING IS ENABLED
+ */
+ private boolean globalFileLockingFlag = true;
+
+ /**
* Initializes the VFS transport by getting the VFS File System manager
* @param cfgCtx the Axsi2 configuration context
* @param trpInDesc the VFS transport in description from the axis2.xml
@@ -124,6 +135,14 @@
fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
fsm.init();
fsManager = fsm;
+ Parameter lockFlagParam = trpInDesc.getParameter(VFSConstants.TRANSPORT_FILE_LOCKING);
+ if (lockFlagParam != null) {
+ String strLockingFlag = lockFlagParam.getValue().toString();
+ // by-default enabled, if explicitly specified as "disable" make it disable
+ if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strLockingFlag)) {
+ globalFileLockingFlag = false;
+ }
+ }
} catch (FileSystemException e) {
handleException("Error initializing the file transport : " + e.getMessage(), e);
}
@@ -196,14 +215,17 @@
if (children == null || children.length == 0) {
if (fileObject.getType() == FileType.FILE) {
- if (VFSUtils.acquireLock(fsManager, fileObject)) {
+ if (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled() &&
+ VFSUtils.acquireLock(fsManager, fileObject))) {
try {
processFile(entry, fileObject);
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
metrics.incrementMessagesReceived();
} catch (AxisFault e) {
- VFSUtils.releaseLock(fsManager, fileObject);
+ if (entry.isFileLockingEnabled()) {
+ VFSUtils.releaseLock(fsManager, fileObject);
+ }
logException("Error processing File URI : "
+ fileObject.getName(), e);
entry.setLastPollState(PollTableEntry.FAILED);
@@ -211,7 +233,9 @@
}
moveOrDeleteAfterProcessing(entry, fileObject);
- VFSUtils.releaseLock(fsManager, fileObject);
+ if (entry.isFileLockingEnabled()) {
+ VFSUtils.releaseLock(fsManager, fileObject);
+ }
} else if (log.isDebugEnabled()) {
log.debug("Couldn't get the lock for processing the file : "
+ fileObject.getName());
@@ -231,7 +255,8 @@
}
if ((entry.getFileNamePattern() != null) && (
child.getName().getBaseName().matches(entry.getFileNamePattern()))
- && VFSUtils.acquireLock(fsManager, child)) {
+ && (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled()
+ && VFSUtils.acquireLock(fsManager, child)))) {
try {
if (log.isDebugEnabled()) {
log.debug("Processing file :" + child);
@@ -243,7 +268,9 @@
metrics.incrementMessagesReceived();
} catch (Exception e) {
- VFSUtils.releaseLock(fsManager, child);
+ if (entry.isFileLockingEnabled()) {
+ VFSUtils.releaseLock(fsManager, child);
+ }
logException("Error processing File URI : " + child.getName(), e);
failCount++;
// tell moveOrDeleteAfterProcessing() file failed
@@ -252,7 +279,9 @@
}
moveOrDeleteAfterProcessing(entry, child);
- VFSUtils.releaseLock(fsManager, child);
+ if (entry.isFileLockingEnabled()) {
+ VFSUtils.releaseLock(fsManager, child);
+ }
} else if (log.isDebugEnabled()) {
log.debug("Couldn't get the lock for processing the file : "
+ child.getName());
@@ -409,7 +438,7 @@
String replyFileURI = entry.getReplyFileURI();
if (replyFileURI != null) {
msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
- new VFSOutTransportInfo(replyFileURI));
+ new VFSOutTransportInfo(replyFileURI, entry.isFileLockingEnabled()));
}
// Determine the message builder to use
@@ -489,6 +518,6 @@
@Override
protected PollTableEntry createEndpoint() {
- return new PollTableEntry();
+ return new PollTableEntry(globalFileLockingFlag);
}
}
Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java Sun Aug 9 04:12:43 2009
@@ -23,6 +23,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.MessageFormatter;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.AbstractTransportSender;
@@ -40,7 +41,9 @@
/**
* axis2.xml - transport definition
- * <transportSender name="file" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
+ * <transportSender name="file" class="org.apache.synapse.transport.vfs.VFSTransportSender">
+ * <parameter name="transport.vfs.Locking">enable|disable</parameter> ?
+ * </transportSender>
*/
public class VFSTransportSender extends AbstractTransportSender implements ManagementSupport {
@@ -50,6 +53,14 @@
private FileSystemManager fsManager = null;
/**
+ * By default file locking in VFS transport is turned on at a global level
+ *
+ * NOTE: DO NOT USE THIS FLAG, USE PollTableEntry#isFileLockingEnabled() TO CHECK WHETHR
+ * FILE LOCKING IS ENABLED
+ */
+ private boolean globalFileLockingFlag = true;
+
+ /**
* The public constructor
*/
public VFSTransportSender() {
@@ -71,6 +82,14 @@
fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
fsm.init();
fsManager = fsm;
+ Parameter lckFlagParam = transportOut.getParameter(VFSConstants.TRANSPORT_FILE_LOCKING);
+ if (lckFlagParam != null) {
+ String strLockingFlag = lckFlagParam.getValue().toString();
+ // by-default enabled, if explicitly specified as "disable" make it disable
+ if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strLockingFlag)) {
+ globalFileLockingFlag = false;
+ }
+ }
} catch (FileSystemException e) {
handleException("Error initializing the file transport : " + e.getMessage(), e);
}
@@ -93,7 +112,7 @@
VFSOutTransportInfo vfsOutInfo = null;
if (targetAddress != null) {
- vfsOutInfo = new VFSOutTransportInfo(targetAddress);
+ vfsOutInfo = new VFSOutTransportInfo(targetAddress, globalFileLockingFlag);
} else if (outTransportInfo != null && outTransportInfo instanceof VFSOutTransportInfo) {
vfsOutInfo = (VFSOutTransportInfo) outTransportInfo;
}
@@ -144,28 +163,49 @@
FileObject responseFile = fsManager.resolveFile(replyFile,
VFSUtils.getFileName(msgCtx, vfsOutInfo));
- acquireLockForSending(responseFile, vfsOutInfo);
- if (!responseFile.exists()) {
- responseFile.createFile();
+ // if file locking is not disabled acquire the lock
+ // before uploading the file
+ if (vfsOutInfo.isFileLockingEnabled()) {
+ acquireLockForSending(responseFile, vfsOutInfo);
+ if (!responseFile.exists()) {
+ responseFile.createFile();
+ }
+ populateResponseFile(responseFile, msgCtx,append, true);
+ VFSUtils.releaseLock(fsManager, responseFile);
+ } else {
+ if (!responseFile.exists()) {
+ responseFile.createFile();
+ }
+ populateResponseFile(responseFile, msgCtx,append, false);
}
- populateResponseFile(responseFile, msgCtx, append);
- VFSUtils.releaseLock(fsManager, responseFile);
} else if (replyFile.getType() == FileType.FILE) {
-
- acquireLockForSending(replyFile, vfsOutInfo);
- populateResponseFile(replyFile, msgCtx, append);
- VFSUtils.releaseLock(fsManager, replyFile);
-
+
+ // if file locking is not disabled acquire the lock
+ // before uploading the file
+ if (vfsOutInfo.isFileLockingEnabled()) {
+ acquireLockForSending(replyFile, vfsOutInfo);
+ populateResponseFile(replyFile, msgCtx, append, true);
+ VFSUtils.releaseLock(fsManager, replyFile);
+ } else {
+ populateResponseFile(replyFile, msgCtx, append, false);
+ }
+
} else {
handleException("Unsupported reply file type : " + replyFile.getType() +
" for file : " + vfsOutInfo.getOutFileURI());
}
} else {
- acquireLockForSending(replyFile, vfsOutInfo);
- replyFile.createFile();
- populateResponseFile(replyFile, msgCtx, append);
- VFSUtils.releaseLock(fsManager, replyFile);
+ // if file locking is not disabled acquire the lock before uploading the file
+ if (vfsOutInfo.isFileLockingEnabled()) {
+ acquireLockForSending(replyFile, vfsOutInfo);
+ replyFile.createFile();
+ populateResponseFile(replyFile, msgCtx, append, true);
+ VFSUtils.releaseLock(fsManager, replyFile);
+ } else {
+ replyFile.createFile();
+ populateResponseFile(replyFile, msgCtx, append, false);
+ }
}
} catch (FileSystemException e) {
handleException("Error resolving reply file : " +
@@ -183,7 +223,7 @@
}
private void populateResponseFile(FileObject responseFile, MessageContext msgContext,
- boolean append) throws AxisFault {
+ boolean append, boolean lockingEnabled) throws AxisFault {
MessageFormatter messageFormatter = BaseUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
@@ -202,11 +242,15 @@
metrics.incrementBytesSent(msgContext, os.getByteCount());
} catch (FileSystemException e) {
- VFSUtils.releaseLock(fsManager, responseFile);
+ if (lockingEnabled) {
+ VFSUtils.releaseLock(fsManager, responseFile);
+ }
metrics.incrementFaultsSending();
handleException("IO Error while creating response file : " + responseFile.getName(), e);
} catch (IOException e) {
- VFSUtils.releaseLock(fsManager, responseFile);
+ if (lockingEnabled) {
+ VFSUtils.releaseLock(fsManager, responseFile);
+ }
metrics.incrementFaultsSending();
handleException("IO Error while creating response file : " + responseFile.getName(), e);
}