You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2009/08/09 03:12:39 UTC
svn commit: r802472 - in
/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs:
FileObjectDataSource.java VFSConstants.java VFSOutTransportInfo.java
VFSTransportListener.java VFSTransportSender.java VFSUtils.java
Author: ruwan
Date: Sun Aug 9 01:12:39 2009
New Revision: 802472
URL: http://svn.apache.org/viewvc?rev=802472&view=rev
Log:
code reformatting and minor refactoring of the VFS transport - Part ii
Modified:
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java Sun Aug 9 01:12:39 2009
@@ -18,20 +18,21 @@
*/
package org.apache.synapse.transport.vfs;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
import org.apache.axiom.attachments.SizeAwareDataSource;
import org.apache.commons.vfs.FileObject;
import org.apache.commons.vfs.FileSystemException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
/**
* Data source that reads data from a VFS {@link FileObject}.
* This class is similar to VFS' own FileObjectDataSource implementation, but in addition
* implements {@link SizeAwareDataSource}.
*/
public class FileObjectDataSource implements SizeAwareDataSource {
+
private final FileObject file;
private final String contentType;
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Sun Aug 9 01:12:39 2009
@@ -19,17 +19,24 @@
package org.apache.synapse.transport.vfs;
-public class VFSConstants {
+public final class VFSConstants {
+
// vfs transport prefix (e.g. used in an out EPR etc)
public static final String VFS_PREFIX = "vfs:";
- public static final String TRANSPORT_FILE_ACTION_AFTER_PROCESS = "transport.vfs.ActionAfterProcess";
- public static final String TRANSPORT_FILE_ACTION_AFTER_ERRORS = "transport.vfs.ActionAfterErrors";
- public static final String TRANSPORT_FILE_ACTION_AFTER_FAILURE = "transport.vfs.ActionAfterFailure";
-
- public static final String TRANSPORT_FILE_MOVE_AFTER_PROCESS = "transport.vfs.MoveAfterProcess";
- public static final String TRANSPORT_FILE_MOVE_AFTER_ERRORS = "transport.vfs.MoveAfterErrors";
- public static final String TRANSPORT_FILE_MOVE_AFTER_FAILURE = "transport.vfs.MoveAfterFailure";
+ public static final String TRANSPORT_FILE_ACTION_AFTER_PROCESS
+ = "transport.vfs.ActionAfterProcess";
+ public static final String TRANSPORT_FILE_ACTION_AFTER_ERRORS
+ = "transport.vfs.ActionAfterErrors";
+ public static final String TRANSPORT_FILE_ACTION_AFTER_FAILURE
+ = "transport.vfs.ActionAfterFailure";
+
+ public static final String TRANSPORT_FILE_MOVE_AFTER_PROCESS
+ = "transport.vfs.MoveAfterProcess";
+ public static final String TRANSPORT_FILE_MOVE_AFTER_ERRORS
+ = "transport.vfs.MoveAfterErrors";
+ public static final String TRANSPORT_FILE_MOVE_AFTER_FAILURE
+ = "transport.vfs.MoveAfterFailure";
public static final String TRANSPORT_FILE_FILE_URI = "transport.vfs.FileURI";
public static final String TRANSPORT_FILE_FILE_NAME_PATTERN = "transport.vfs.FileNamePattern";
@@ -38,7 +45,8 @@
public static final String REPLY_FILE_URI = "transport.vfs.ReplyFileURI";
public static final String REPLY_FILE_NAME = "transport.vfs.ReplyFileName";
- public static final String TRANSPORT_FILE_MOVE_TIMESTAMP_FORMAT = "transport.vfs.MoveTimestampFormat";
+ public static final String TRANSPORT_FILE_MOVE_TIMESTAMP_FORMAT
+ = "transport.vfs.MoveTimestampFormat";
public static final String DEFAULT_RESPONSE_FILE = "response.xml";
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java Sun Aug 9 01:12:39 2009
@@ -19,18 +19,19 @@
package org.apache.synapse.transport.vfs;
-import java.util.Map;
-
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+
/**
* The VFS OutTransportInfo is a holder of information to send an outgoing message
* (e.g. a Response) to a VFS destination. Thus at a minimum a reference to a
* File URI (i.e. directory or a file) are held
*/
+
public class VFSOutTransportInfo implements OutTransportInfo {
private static final Log log = LogFactory.getLog(VFSOutTransportInfo.class);
@@ -42,7 +43,14 @@
private long reconnectTimeout = 30000;
private boolean append;
+ /**
+ * Constructs the VFSOutTransportInfo containing the information about the file to which the
+ * response has to be submitted to.
+ *
+ * @param outFileURI URI of the file to which the message is delivered
+ */
VFSOutTransportInfo(String outFileURI) {
+
if (outFileURI.startsWith(VFSConstants.VFS_PREFIX)) {
this.outFileURI = outFileURI.substring(VFSConstants.VFS_PREFIX.length());
} else {
@@ -50,18 +58,31 @@
}
Map<String,String> properties = BaseUtils.getEPRProperties(outFileURI);
- if(properties.containsKey(VFSConstants.MAX_RETRY_COUNT)) {
+ if (properties.containsKey(VFSConstants.MAX_RETRY_COUNT)) {
String strMaxRetryCount = properties.get(VFSConstants.MAX_RETRY_COUNT);
maxRetryCount = Integer.parseInt(strMaxRetryCount);
+ } else {
+ maxRetryCount = VFSConstants.DEFAULT_MAX_RETRY_COUNT;
}
- if(properties.containsKey(VFSConstants.RECONNECT_TIMEOUT)) {
+
+ if (properties.containsKey(VFSConstants.RECONNECT_TIMEOUT)) {
String strReconnectTimeout = properties.get(VFSConstants.RECONNECT_TIMEOUT);
reconnectTimeout = Long.parseLong(strReconnectTimeout) * 1000;
- }
+ } else {
+ reconnectTimeout = VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
+ }
+
if (properties.containsKey(VFSConstants.APPEND)) {
String strAppend = properties.get(VFSConstants.APPEND);
append = Boolean.parseBoolean(strAppend);
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Using the fileURI : " + this.outFileURI);
+ log.debug("Using the maxRetryCount : " + maxRetryCount);
+ log.debug("Using the reconnectionTimeout : " + reconnectTimeout);
+ log.debug("Using the append : " + append);
+ }
}
public void setContentType(String contentType) {
@@ -77,19 +98,19 @@
}
public int getMaxRetryCount() {
- return maxRetryCount;
+ return maxRetryCount;
}
public void setMaxRetryCount(int maxRetryCount) {
- this.maxRetryCount = maxRetryCount;
+ this.maxRetryCount = maxRetryCount;
}
public long getReconnectTimeout() {
- return reconnectTimeout;
+ return reconnectTimeout;
}
public void setReconnectTimeout(long reconnectTimeout) {
- this.reconnectTimeout = reconnectTimeout;
+ this.reconnectTimeout = reconnectTimeout;
}
public boolean isAppend() {
@@ -99,4 +120,8 @@
public void setAppend(boolean append) {
this.append = append;
}
+
+ public String getContentType() {
+ return contentType;
+ }
}
\ No newline at end of file
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sun Aug 9 01:12:39 2009
@@ -21,29 +21,30 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
-import org.apache.axis2.description.*;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.builder.BuilderUtil;
+import org.apache.axis2.builder.SOAPBuilder;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.format.DataSourceMessageBuilder;
import org.apache.axis2.format.ManagedDataSource;
import org.apache.axis2.format.ManagedDataSourceFactory;
import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.transport.base.AbstractPollingTransportListener;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.BaseUtils;
-import org.apache.axis2.transport.base.AbstractPollingTransportListener;
import org.apache.axis2.transport.base.ManagementSupport;
-import org.apache.axis2.builder.Builder;
-import org.apache.axis2.builder.BuilderUtil;
-import org.apache.axis2.builder.SOAPBuilder;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
import org.apache.commons.vfs.*;
import org.apache.commons.vfs.impl.StandardFileSystemManager;
import javax.mail.internet.ContentType;
import javax.mail.internet.ParseException;
-
-import java.util.*;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
/**
* The "vfs" transport is a polling based transport - i.e. it gets kicked off at
@@ -168,7 +169,8 @@
} catch (FileSystemException e) {
log.error("cannot resolve fileObject", e);
if (maxRetryCount <= retryCount)
- processFailure("cannot resolve fileObject repeatedly: " + e.getMessage(), e, entry);
+ processFailure("cannot resolve fileObject repeatedly: "
+ + e.getMessage(), e, entry);
return;
}
@@ -210,7 +212,7 @@
moveOrDeleteAfterProcessing(entry, fileObject);
VFSUtils.releaseLock(fsManager, fileObject);
- } else {
+ } else if (log.isDebugEnabled()) {
log.debug("Couldn't get the lock for processing the file : "
+ fileObject.getName());
}
@@ -251,7 +253,7 @@
moveOrDeleteAfterProcessing(entry, child);
VFSUtils.releaseLock(fsManager, child);
- } else {
+ } else if (log.isDebugEnabled()) {
log.debug("Couldn't get the lock for processing the file : "
+ child.getName());
}
@@ -272,10 +274,8 @@
entry.setLastPollTime(now);
entry.setNextPollTime(now + entry.getPollInterval());
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Unable to access or read file or directory : " + fileURI);
- }
+ } else if (log.isDebugEnabled()) {
+ log.debug("Unable to access or read file or directory : " + fileURI);
}
onPollCompletion(entry);
} catch (FileSystemException e) {
@@ -317,7 +317,8 @@
} else {
prefix = "";
}
- FileObject dest = moveToDirectory.resolveFile(prefix + fileObject.getName().getBaseName());
+ FileObject dest = moveToDirectory.resolveFile(
+ prefix + fileObject.getName().getBaseName());
if (log.isDebugEnabled()) {
log.debug("Moving to file :" + dest.getName().getURI());
}
@@ -341,7 +342,8 @@
}
} catch (FileSystemException e) {
- log.error("Error resolving directory to move after processing : " + moveToDirectoryURI, e);
+ log.error("Error resolving directory to move after processing : "
+ + moveToDirectoryURI, e);
}
}
@@ -365,11 +367,8 @@
transportHeaders.put(VFSConstants.FILE_NAME, fileName);
try {
- transportHeaders.put(VFSConstants.FILE_LENGTH, Long.valueOf(content.getSize()));
- } catch (FileSystemException ignore) {}
- try {
- transportHeaders.put(VFSConstants.LAST_MODIFIED,
- Long.valueOf(content.getLastModifiedTime()));
+ transportHeaders.put(VFSConstants.FILE_LENGTH, content.getSize());
+ transportHeaders.put(VFSConstants.LAST_MODIFIED, content.getLastModifiedTime());
} catch (FileSystemException ignore) {}
MessageContext msgContext = entry.createMessageContext();
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java Sun Aug 9 01:12:39 2009
@@ -62,7 +62,9 @@
* @param transportOut the transport-out description
* @throws AxisFault on error
*/
- public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
+ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut)
+ throws AxisFault {
+
super.init(cfgCtx, transportOut);
try {
StandardFileSystemManager fsm = new StandardFileSystemManager();
@@ -78,7 +80,6 @@
* Send the given message over the VFS transport
*
* @param msgCtx the axis2 message context
- * @return the result of the send operation / handler
* @throws AxisFault on error
*/
public void sendMessage(MessageContext msgCtx, String targetAddress,
@@ -103,8 +104,8 @@
boolean wasError = true;
int retryCount = 0;
- int maxRetryCount = VFSUtils.getMaxRetryCount(vfsOutInfo);
- long reconnectionTimeout = VFSUtils.getReconnectTimout(vfsOutInfo);
+ int maxRetryCount = vfsOutInfo.getMaxRetryCount();
+ long reconnectionTimeout = vfsOutInfo.getReconnectTimeout();
boolean append = vfsOutInfo.isAppend();
while (wasError) {
@@ -113,17 +114,18 @@
retryCount++;
replyFile = fsManager.resolveFile(vfsOutInfo.getOutFileURI());
- if(replyFile == null) {
+ if (replyFile == null) {
log.error("replyFile is null");
throw new FileSystemException("replyFile is null");
}
-
wasError = false;
- } catch(FileSystemException e) {
+ } catch (FileSystemException e) {
log.error("cannot resolve replyFile", e);
- if(maxRetryCount <= retryCount)
- handleException("cannot resolve replyFile repeatedly: " + e.getMessage(), e);
+ if(maxRetryCount <= retryCount) {
+ handleException("cannot resolve replyFile repeatedly: "
+ + e.getMessage(), e);
+ }
}
if (wasError) {
@@ -180,19 +182,25 @@
}
}
- private void populateResponseFile(FileObject responseFile, MessageContext msgContext, boolean append) throws AxisFault {
+ private void populateResponseFile(FileObject responseFile, MessageContext msgContext,
+ boolean append) throws AxisFault {
+
MessageFormatter messageFormatter = BaseUtils.getMessageFormatter(msgContext);
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
+
try {
- CountingOutputStream os = new CountingOutputStream(responseFile.getContent().getOutputStream(append));
+ CountingOutputStream os = new CountingOutputStream(
+ responseFile.getContent().getOutputStream(append));
try {
messageFormatter.writeTo(msgContext, format, os, true);
} finally {
os.close();
}
+
// update metrics
metrics.incrementMessagesSent(msgContext);
metrics.incrementBytesSent(msgContext, os.getByteCount());
+
} catch (FileSystemException e) {
VFSUtils.releaseLock(fsManager, responseFile);
metrics.incrementFaultsSending();
@@ -206,6 +214,7 @@
private void acquireLockForSending(FileObject responseFile, VFSOutTransportInfo vfsOutInfo)
throws AxisFault {
+
int tryNum = 0;
// wait till we get the lock
while (!VFSUtils.acquireLock(fsManager, responseFile)) {
@@ -214,6 +223,7 @@
+ responseFile.getName() + ", unable to acquire the " +
"lock even after " + tryNum + " retries");
} else {
+
log.warn("Couldn't get the lock for the file : "
+ responseFile.getName() + ", retry : " + tryNum
+ " scheduled after : " + vfsOutInfo.getReconnectTimeout());
Modified: synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java?rev=802472&r1=802471&r2=802472&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java (original)
+++ synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java Sun Aug 9 01:12:39 2009
@@ -84,16 +84,6 @@
}
return fileName;
}
-
- public static int getMaxRetryCount(VFSOutTransportInfo vfsOutInfo) {
- return vfsOutInfo.getMaxRetryCount() != 0 ? vfsOutInfo.getMaxRetryCount() :
- VFSConstants.DEFAULT_MAX_RETRY_COUNT;
- }
-
- public static long getReconnectTimout(VFSOutTransportInfo vfsOutInfo) {
- return vfsOutInfo.getReconnectTimeout() != 0 ? vfsOutInfo.getReconnectTimeout() :
- VFSConstants.DEFAULT_RECONNECT_TIMEOUT;
- }
/**
* Acquires a file item lock before processing the item, guaranteing that the file is not