You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2009/03/07 20:03:12 UTC
svn commit: r751307 - in /synapse/trunk/java/modules/transports/src:
main/java/org/apache/synapse/transport/vfs/
test/java/org/apache/synapse/transport/vfs/
Author: veithen
Date: Sat Mar 7 19:03:11 2009
New Revision: 751307
URL: http://svn.apache.org/viewvc?rev=751307&view=rev
Log:
Allow streaming of large binary and text files through the VFS transport.
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java (with props)
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java
Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java?rev=751307&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java Sat Mar 7 19:03:11 2009
@@ -0,0 +1,66 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.transport.vfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.axiom.attachments.SizeAwareDataSource;
+import org.apache.commons.vfs.FileObject;
+import org.apache.commons.vfs.FileSystemException;
+
+/**
+ * Data source that reads data from a VFS {@link FileObject}.
+ * This class is similar to VFS' own FileObjectDataSource implementation, but in addition
+ * implements {@link SizeAwareDataSource}.
+ */
+public class FileObjectDataSource implements SizeAwareDataSource {
+ private final FileObject file;
+ private final String contentType;
+
+ public FileObjectDataSource(FileObject file, String contentType) {
+ this.file = file;
+ this.contentType = contentType;
+ }
+
+ public long getSize() {
+ try {
+ return file.getContent().getSize();
+ } catch (FileSystemException ex) {
+ return -1;
+ }
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public String getName() {
+ return file.getName().getURI();
+ }
+
+ public InputStream getInputStream() throws IOException {
+ return file.getContent().getInputStream();
+ }
+
+ public OutputStream getOutputStream() throws IOException {
+ return file.getContent().getOutputStream();
+ }
+}
Propchange: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java Sat Mar 7 19:03:11 2009
@@ -56,6 +56,8 @@
/** moved file will have this formatted timestamp prefix */
private DateFormat moveTimestampFormat;
+ private boolean streaming;
+
private int maxRetryCount;
private long reconnectTimeout;
@@ -160,6 +162,14 @@
}
}
+ public boolean isStreaming() {
+ return streaming;
+ }
+
+ public void setStreaming(boolean streaming) {
+ this.streaming = streaming;
+ }
+
public int getMaxRetryCount() {
return maxRetryCount;
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Sat Mar 7 19:03:11 2009
@@ -42,6 +42,8 @@
public static final String DEFAULT_RESPONSE_FILE = "response.xml";
+ public static final String STREAMING = "transport.vfs.Streaming";
+
public static final String MAX_RETRY_COUNT = "transport.vfs.MaxRetryCount";
public static final String RECONNECT_TIMEOUT = "transport.vfs.ReconnectTimeout";
public static final String APPEND = "transport.vfs.Append";
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sat Mar 7 19:03:11 2009
@@ -18,16 +18,22 @@
*/
package org.apache.synapse.transport.vfs;
-import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.description.*;
+import org.apache.axis2.format.DataSourceMessageBuilder;
+import org.apache.axis2.format.ManagedDataSource;
+import org.apache.axis2.format.ManagedDataSourceFactory;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.AbstractPollingTransportListener;
import org.apache.axis2.transport.base.ManagementSupport;
import org.apache.axis2.transport.base.ParamUtils;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.builder.BuilderUtil;
+import org.apache.axis2.builder.SOAPBuilder;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.vfs.*;
@@ -36,7 +42,6 @@
import javax.mail.internet.ContentType;
import javax.mail.internet.ParseException;
import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
import java.util.*;
import java.io.File;
@@ -418,18 +423,45 @@
new VFSOutTransportInfo((String) param.getValue()));
}
+ // Determine the message builder to use
+ Builder builder;
+ if (contentType == null) {
+ log.debug("No content type specified. Using SOAP builder.");
+ builder = new SOAPBuilder();
+ } else {
+ int index = contentType.indexOf(';');
+ String type = index > 0 ? contentType.substring(0, index) : contentType;
+ builder = BuilderUtil.getBuilderFromSelector(type, msgContext);
+ if (builder == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("No message builder found for type '" + type +
+ "'. Falling back to SOAP.");
+ }
+ builder = new SOAPBuilder();
+ }
+ }
// set the message payload to the message context
- InputStream in = content.getInputStream();
+ InputStream in;
+ ManagedDataSource dataSource;
+ if (builder instanceof DataSourceMessageBuilder && entry.isStreaming()) {
+ in = null;
+ dataSource = ManagedDataSourceFactory.create(
+ new FileObjectDataSource(file, contentType));
+ } else {
+ in = content.getInputStream();
+ dataSource = null;
+ }
+
try {
- SOAPEnvelope envelope;
- try {
- envelope = TransportUtils.createSOAPMessage(msgContext, in, contentType);
- } catch (XMLStreamException ex) {
- handleException("Error parsing XML", ex);
- return;
+ OMElement documentElement;
+ if (in != null) {
+ documentElement = builder.processDocument(in, contentType, msgContext);
+ } else {
+ documentElement = ((DataSourceMessageBuilder)builder).processDocument(
+ dataSource, contentType, msgContext);
}
- msgContext.setEnvelope(envelope);
+ msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
handleIncomingMessage(
msgContext,
@@ -439,10 +471,14 @@
);
}
finally {
- try {
- in.close();
- } catch (IOException ex) {
- handleException("Error closing stream", ex);
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ex) {
+ handleException("Error closing stream", ex);
+ }
+ } else {
+ dataSource.destroy();
}
}
@@ -504,6 +540,11 @@
entry.setMoveTimestampFormat(moveTimestampFormat);
}
+ String strStreaming = ParamUtils.getOptionalParam(params, VFSConstants.STREAMING);
+ if (strStreaming != null) {
+ entry.setStreaming(Boolean.parseBoolean(strStreaming));
+ }
+
String strMaxRetryCount = ParamUtils.getOptionalParam(
params, VFSConstants.MAX_RETRY_COUNT);
if(strMaxRetryCount != null)
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java Sat Mar 7 19:03:11 2009
@@ -48,6 +48,8 @@
// Since VFS has no Content-Type header, SwA is not supported.
suite.addExclude("(test=AsyncSwA)");
+ suite.addExclude("(!(&(test=AsyncBinary)(endpoint=axis)))");
+
TransportDescriptionFactory tdf =
new SimpleTransportDescriptionFactory("vfs", VFSTransportListener.class,
VFSTransportSender.class);