You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2009/08/09 06:12:44 UTC

svn commit: r802480 - in /synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs: PollTableEntry.java VFSConstants.java VFSOutTransportInfo.java VFSTransportListener.java VFSTransportSender.java

Author: ruwan
Date: Sun Aug  9 04:12:43 2009
New Revision: 802480

URL: http://svn.apache.org/viewvc?rev=802480&view=rev
Log:
making the VFS transport locking configurable at a global level (either at the sender or receiver level) as well as at a service/endpoint level.

Modified:
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
    synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java Sun Aug  9 04:12:43 2009
@@ -66,6 +66,11 @@
 
     private int maxRetryCount;
     private long reconnectTimeout;
+    private boolean fileLocking;
+
+    public PollTableEntry(boolean fileLocking) {
+        this.fileLocking = fileLocking;
+    }
 
     @Override
     public EndpointReference[] getEndpointReferences(String ip) {
@@ -152,6 +157,10 @@
         return maxRetryCount;
     }
 
+    public boolean isFileLockingEnabled() {
+        return fileLocking;
+    }
+
     public long getReconnectTimeout() {
         return reconnectTimeout;
     }
@@ -226,6 +235,14 @@
             reconnectTimeout = strReconnectTimeout != null ?
                     Integer.parseInt(strReconnectTimeout) * 1000 :
                     VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
+
+            String strFileLocking = ParamUtils.getOptionalParam(
+                    params, VFSConstants.TRANSPORT_FILE_LOCKING);
+            if (VFSConstants.TRANSPORT_FILE_LOCKING_ENABLED.equals(strFileLocking)) {
+                fileLocking = true;
+            } else if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strFileLocking)) {
+                fileLocking = false;
+            }
             
             return super.loadConfiguration(params);
         }

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Sun Aug  9 04:12:43 2009
@@ -41,6 +41,9 @@
     public static final String TRANSPORT_FILE_FILE_URI = "transport.vfs.FileURI";
     public static final String TRANSPORT_FILE_FILE_NAME_PATTERN = "transport.vfs.FileNamePattern";
     public static final String TRANSPORT_FILE_CONTENT_TYPE = "transport.vfs.ContentType";
+    public static final String TRANSPORT_FILE_LOCKING = "transport.vfs.Locking";    
+    public static final String TRANSPORT_FILE_LOCKING_ENABLED = "enable";    
+    public static final String TRANSPORT_FILE_LOCKING_DISABLED = "disable";
 
     public static final String REPLY_FILE_URI = "transport.vfs.ReplyFileURI";
     public static final String REPLY_FILE_NAME = "transport.vfs.ReplyFileName";

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java Sun Aug  9 04:12:43 2009
@@ -42,6 +42,7 @@
     private int maxRetryCount = 3;
     private long reconnectTimeout = 30000;
     private boolean append;
+    private boolean fileLocking;
 
     /**
      * Constructs the VFSOutTransportInfo containing the information about the file to which the
@@ -49,14 +50,14 @@
      * 
      * @param outFileURI URI of the file to which the message is delivered
      */
-    VFSOutTransportInfo(String outFileURI) {
+    VFSOutTransportInfo(String outFileURI, boolean fileLocking) {
         
         if (outFileURI.startsWith(VFSConstants.VFS_PREFIX)) {
             this.outFileURI = outFileURI.substring(VFSConstants.VFS_PREFIX.length());
         } else {
             this.outFileURI = outFileURI;
         }
-        
+
         Map<String,String> properties = BaseUtils.getEPRProperties(outFileURI);
         if (properties.containsKey(VFSConstants.MAX_RETRY_COUNT)) {
             String strMaxRetryCount = properties.get(VFSConstants.MAX_RETRY_COUNT);
@@ -72,6 +73,17 @@
             reconnectTimeout = VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
         }
 
+        if (properties.containsKey(VFSConstants.TRANSPORT_FILE_LOCKING)) {
+            String strFileLocking = properties.get(VFSConstants.TRANSPORT_FILE_LOCKING);
+            if (VFSConstants.TRANSPORT_FILE_LOCKING_ENABLED.equals(strFileLocking)) {
+                fileLocking = true;
+            } else if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strFileLocking)) {
+                fileLocking = false;
+            }
+        } else {
+            this.fileLocking = fileLocking;
+        }
+
         if (properties.containsKey(VFSConstants.APPEND)) {
             String strAppend = properties.get(VFSConstants.APPEND);
             append = Boolean.parseBoolean(strAppend);
@@ -82,6 +94,7 @@
             log.debug("Using the maxRetryCount  : " + maxRetryCount);
             log.debug("Using the reconnectionTimeout : " + reconnectTimeout);
             log.debug("Using the append         : " + append);
+            log.debug("File locking             : " + (this.fileLocking ? "ON" : "OFF"));
         }
     }
 
@@ -124,4 +137,8 @@
     public String getContentType() {
         return contentType;
     }
+
+    public boolean isFileLockingEnabled() {
+        return fileLocking;
+    }
 }
\ No newline at end of file

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sun Aug  9 04:12:43 2009
@@ -27,6 +27,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.Parameter;
 import org.apache.axis2.format.DataSourceMessageBuilder;
 import org.apache.axis2.format.ManagedDataSource;
 import org.apache.axis2.format.ManagedDataSourceFactory;
@@ -76,7 +77,9 @@
  * smb://somehost/home
  *
  * axis2.xml - transport definition
- *  <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>
+ *  <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener">
+ *      <parameter name="transport.vfs.Locking">enable|disable</parameter> ?
+ *  </transportReceiver>
  *
  * services.xml - service attachment
  *  required parameters
@@ -110,6 +113,14 @@
     private FileSystemManager fsManager = null;
 
     /**
+     * By default file locking in VFS transport is turned on at a global level
+     *
+     * NOTE: DO NOT USE THIS FLAG, USE PollTableEntry#isFileLockingEnabled() TO CHECK WHETHR
+     * FILE LOCKING IS ENABLED
+     */
+    private boolean globalFileLockingFlag = true;
+
+    /**
      * Initializes the VFS transport by getting the VFS File System manager
      * @param cfgCtx the Axsi2 configuration context
      * @param trpInDesc the VFS transport in description from the axis2.xml
@@ -124,6 +135,14 @@
             fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
             fsm.init();
             fsManager = fsm;
+            Parameter lockFlagParam = trpInDesc.getParameter(VFSConstants.TRANSPORT_FILE_LOCKING);
+            if (lockFlagParam != null) {
+                String strLockingFlag = lockFlagParam.getValue().toString();
+                // by-default enabled, if explicitly specified as "disable" make it disable
+                if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strLockingFlag)) {
+                    globalFileLockingFlag = false;
+                }
+            }
         } catch (FileSystemException e) {
             handleException("Error initializing the file transport : " + e.getMessage(), e);
         }
@@ -196,14 +215,17 @@
                 if (children == null || children.length == 0) {
 
                     if (fileObject.getType() == FileType.FILE) {
-                        if (VFSUtils.acquireLock(fsManager, fileObject)) {
+                        if (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled() &&
+                                VFSUtils.acquireLock(fsManager, fileObject))) {
                             try {
                                 processFile(entry, fileObject);
                                 entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
                                 metrics.incrementMessagesReceived();
 
                             } catch (AxisFault e) {
-                                VFSUtils.releaseLock(fsManager, fileObject);
+                                if (entry.isFileLockingEnabled()) {
+                                    VFSUtils.releaseLock(fsManager, fileObject);
+                                }
                                 logException("Error processing File URI : "
                                         + fileObject.getName(), e);
                                 entry.setLastPollState(PollTableEntry.FAILED);
@@ -211,7 +233,9 @@
                             }
 
                             moveOrDeleteAfterProcessing(entry, fileObject);
-                            VFSUtils.releaseLock(fsManager, fileObject);
+                            if (entry.isFileLockingEnabled()) {
+                                VFSUtils.releaseLock(fsManager, fileObject);
+                            }
                         } else if (log.isDebugEnabled()) {
                             log.debug("Couldn't get the lock for processing the file : "
                                     + fileObject.getName());
@@ -231,7 +255,8 @@
                         }
                         if ((entry.getFileNamePattern() != null) && (
                                 child.getName().getBaseName().matches(entry.getFileNamePattern()))
-                                && VFSUtils.acquireLock(fsManager, child)) {
+                                && (!entry.isFileLockingEnabled() || (entry.isFileLockingEnabled()
+                                && VFSUtils.acquireLock(fsManager, child)))) {
                             try {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Processing file :" + child);
@@ -243,7 +268,9 @@
                                 metrics.incrementMessagesReceived();
 
                             } catch (Exception e) {
-                                VFSUtils.releaseLock(fsManager, child);
+                                if (entry.isFileLockingEnabled()) {
+                                    VFSUtils.releaseLock(fsManager, child);
+                                }
                                 logException("Error processing File URI : " + child.getName(), e);
                                 failCount++;
                                 // tell moveOrDeleteAfterProcessing() file failed
@@ -252,7 +279,9 @@
                             }
 
                             moveOrDeleteAfterProcessing(entry, child);
-                            VFSUtils.releaseLock(fsManager, child);
+                            if (entry.isFileLockingEnabled()) {
+                                VFSUtils.releaseLock(fsManager, child);
+                            }
                         } else if (log.isDebugEnabled()) {
                             log.debug("Couldn't get the lock for processing the file : "
                                     + child.getName());
@@ -409,7 +438,7 @@
             String replyFileURI = entry.getReplyFileURI();
             if (replyFileURI != null) {
                 msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
-                        new VFSOutTransportInfo(replyFileURI));
+                        new VFSOutTransportInfo(replyFileURI, entry.isFileLockingEnabled()));
             }
 
             // Determine the message builder to use
@@ -489,6 +518,6 @@
 
     @Override
     protected PollTableEntry createEndpoint() {
-        return new PollTableEntry();
+        return new PollTableEntry(globalFileLockingFlag);
     }
 }

Modified: synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=802480&r1=802479&r2=802480&view=diff
==============================================================================
--- synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java (original)
+++ synapse/branches/1.3/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java Sun Aug  9 04:12:43 2009
@@ -23,6 +23,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.description.Parameter;
 import org.apache.axis2.transport.MessageFormatter;
 import org.apache.axis2.transport.OutTransportInfo;
 import org.apache.axis2.transport.base.AbstractTransportSender;
@@ -40,7 +41,9 @@
 
 /**
  * axis2.xml - transport definition
- *  <transportSender name="file" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
+ *  <transportSender name="file" class="org.apache.synapse.transport.vfs.VFSTransportSender">
+ *      <parameter name="transport.vfs.Locking">enable|disable</parameter> ?
+ *  </transportSender>
  */
 public class VFSTransportSender extends AbstractTransportSender implements ManagementSupport {
 
@@ -50,6 +53,14 @@
     private FileSystemManager fsManager = null;
 
     /**
+     * By default file locking in VFS transport is turned on at a global level
+     *
+     * NOTE: DO NOT USE THIS FLAG, USE PollTableEntry#isFileLockingEnabled() TO CHECK WHETHR
+     * FILE LOCKING IS ENABLED
+     */
+    private boolean globalFileLockingFlag = true;
+
+    /**
      * The public constructor
      */
     public VFSTransportSender() {
@@ -71,6 +82,14 @@
             fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
             fsm.init();
             fsManager = fsm;
+            Parameter lckFlagParam = transportOut.getParameter(VFSConstants.TRANSPORT_FILE_LOCKING);
+            if (lckFlagParam != null) {
+                String strLockingFlag = lckFlagParam.getValue().toString();
+                // by-default enabled, if explicitly specified as "disable" make it disable
+                if (VFSConstants.TRANSPORT_FILE_LOCKING_DISABLED.equals(strLockingFlag)) {
+                    globalFileLockingFlag = false;
+                }
+            }
         } catch (FileSystemException e) {
             handleException("Error initializing the file transport : " + e.getMessage(), e);
         }
@@ -93,7 +112,7 @@
         VFSOutTransportInfo vfsOutInfo = null;
 
         if (targetAddress != null) {
-            vfsOutInfo = new VFSOutTransportInfo(targetAddress);
+            vfsOutInfo = new VFSOutTransportInfo(targetAddress, globalFileLockingFlag);
         } else if (outTransportInfo != null && outTransportInfo instanceof VFSOutTransportInfo) {
             vfsOutInfo = (VFSOutTransportInfo) outTransportInfo;
         }
@@ -144,28 +163,49 @@
                         FileObject responseFile = fsManager.resolveFile(replyFile,
                                 VFSUtils.getFileName(msgCtx, vfsOutInfo));
 
-                        acquireLockForSending(responseFile, vfsOutInfo);
-                        if (!responseFile.exists()) {
-                            responseFile.createFile();
+                        // if file locking is not disabled acquire the lock
+                        // before uploading the file
+                        if (vfsOutInfo.isFileLockingEnabled()) {
+                            acquireLockForSending(responseFile, vfsOutInfo);
+                            if (!responseFile.exists()) {
+                                responseFile.createFile();
+                            }
+                            populateResponseFile(responseFile, msgCtx,append, true);
+                            VFSUtils.releaseLock(fsManager, responseFile);
+                        } else {
+                            if (!responseFile.exists()) {
+                                responseFile.createFile();
+                            }
+                            populateResponseFile(responseFile, msgCtx,append, false);
                         }
-                        populateResponseFile(responseFile, msgCtx, append);
-                        VFSUtils.releaseLock(fsManager, responseFile);
 
                     } else if (replyFile.getType() == FileType.FILE) {
-                        
-                        acquireLockForSending(replyFile, vfsOutInfo);
-                        populateResponseFile(replyFile, msgCtx, append);
-                        VFSUtils.releaseLock(fsManager, replyFile);
-                        
+
+                        // if file locking is not disabled acquire the lock
+                        // before uploading the file
+                        if (vfsOutInfo.isFileLockingEnabled()) {
+                            acquireLockForSending(replyFile, vfsOutInfo);
+                            populateResponseFile(replyFile, msgCtx, append, true);
+                            VFSUtils.releaseLock(fsManager, replyFile);
+                        } else {
+                            populateResponseFile(replyFile, msgCtx, append, false);
+                        }
+
                     } else {
                         handleException("Unsupported reply file type : " + replyFile.getType() +
                                 " for file : " + vfsOutInfo.getOutFileURI());
                     }
                 } else {
-                    acquireLockForSending(replyFile, vfsOutInfo);
-                    replyFile.createFile();
-                    populateResponseFile(replyFile, msgCtx, append);
-                    VFSUtils.releaseLock(fsManager, replyFile);
+                    // if file locking is not disabled acquire the lock before uploading the file
+                    if (vfsOutInfo.isFileLockingEnabled()) {
+                        acquireLockForSending(replyFile, vfsOutInfo);
+                        replyFile.createFile();
+                        populateResponseFile(replyFile, msgCtx, append, true);
+                        VFSUtils.releaseLock(fsManager, replyFile);
+                    } else {
+                        replyFile.createFile();
+                        populateResponseFile(replyFile, msgCtx, append, false);
+                    }
                 }
             } catch (FileSystemException e) {
                 handleException("Error resolving reply file : " +
@@ -183,7 +223,7 @@
     }
 
     private void populateResponseFile(FileObject responseFile, MessageContext msgContext,
-                                      boolean append) throws AxisFault {
+                                      boolean append, boolean lockingEnabled) throws AxisFault {
         
         MessageFormatter messageFormatter = BaseUtils.getMessageFormatter(msgContext);
         OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
@@ -202,11 +242,15 @@
             metrics.incrementBytesSent(msgContext, os.getByteCount());
             
         } catch (FileSystemException e) {
-            VFSUtils.releaseLock(fsManager, responseFile);
+            if (lockingEnabled) {
+                VFSUtils.releaseLock(fsManager, responseFile);
+            }
             metrics.incrementFaultsSending();
             handleException("IO Error while creating response file : " + responseFile.getName(), e);
         } catch (IOException e) {
-            VFSUtils.releaseLock(fsManager, responseFile);
+            if (lockingEnabled) {
+                VFSUtils.releaseLock(fsManager, responseFile);
+            }
             metrics.incrementFaultsSending();
             handleException("IO Error while creating response file : " + responseFile.getName(), e);
         }