You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2009/03/07 20:03:12 UTC

svn commit: r751307 - in /synapse/trunk/java/modules/transports/src: main/java/org/apache/synapse/transport/vfs/ test/java/org/apache/synapse/transport/vfs/

Author: veithen
Date: Sat Mar  7 19:03:11 2009
New Revision: 751307

URL: http://svn.apache.org/viewvc?rev=751307&view=rev
Log:
Allow streaming of large binary and text files through the VFS transport.

Added:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java   (with props)
Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java

Added: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java?rev=751307&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java (added)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java Sat Mar  7 19:03:11 2009
@@ -0,0 +1,66 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+package org.apache.synapse.transport.vfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.axiom.attachments.SizeAwareDataSource;
+import org.apache.commons.vfs.FileObject;
+import org.apache.commons.vfs.FileSystemException;
+
+/**
+ * Data source that reads data from a VFS {@link FileObject}.
+ * This class is similar to VFS' own FileObjectDataSource implementation, but in addition
+ * implements {@link SizeAwareDataSource}.
+ */
+public class FileObjectDataSource implements SizeAwareDataSource {
+    private final FileObject file;
+    private final String contentType;
+
+    public FileObjectDataSource(FileObject file, String contentType) {
+        this.file = file;
+        this.contentType = contentType;
+    }
+
+    public long getSize() {
+        try {
+            return file.getContent().getSize();
+        } catch (FileSystemException ex) {
+            return -1;
+        }
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public String getName() {
+        return file.getName().getURI();
+    }
+
+    public InputStream getInputStream() throws IOException {
+        return file.getContent().getInputStream();
+    }
+
+    public OutputStream getOutputStream() throws IOException {
+        return file.getContent().getOutputStream();
+    }
+}

Propchange: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/FileObjectDataSource.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java Sat Mar  7 19:03:11 2009
@@ -56,6 +56,8 @@
     /** moved file will have this formatted timestamp prefix */    
     private DateFormat moveTimestampFormat;
 
+    private boolean streaming;
+
     private int maxRetryCount;
     private long reconnectTimeout;
 
@@ -160,6 +162,14 @@
         }
     }
 
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
+    }
+
     public int getMaxRetryCount() {
       return maxRetryCount;
     }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java Sat Mar  7 19:03:11 2009
@@ -42,6 +42,8 @@
 
     public static final String DEFAULT_RESPONSE_FILE = "response.xml";
     
+    public static final String STREAMING = "transport.vfs.Streaming";
+    
     public static final String MAX_RETRY_COUNT = "transport.vfs.MaxRetryCount";
     public static final String RECONNECT_TIMEOUT = "transport.vfs.ReconnectTimeout";
     public static final String APPEND = "transport.vfs.Append";

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java Sat Mar  7 19:03:11 2009
@@ -18,16 +18,22 @@
 */
 package org.apache.synapse.transport.vfs;
 
-import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.description.*;
+import org.apache.axis2.format.DataSourceMessageBuilder;
+import org.apache.axis2.format.ManagedDataSource;
+import org.apache.axis2.format.ManagedDataSourceFactory;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.transport.base.BaseConstants;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.AbstractPollingTransportListener;
 import org.apache.axis2.transport.base.ManagementSupport;
 import org.apache.axis2.transport.base.ParamUtils;
+import org.apache.axis2.builder.Builder;
+import org.apache.axis2.builder.BuilderUtil;
+import org.apache.axis2.builder.SOAPBuilder;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.commons.vfs.*;
@@ -36,7 +42,6 @@
 import javax.mail.internet.ContentType;
 import javax.mail.internet.ParseException;
 import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
 
 import java.util.*;
 import java.io.File;
@@ -418,18 +423,45 @@
                     new VFSOutTransportInfo((String) param.getValue()));
             }
 
+            // Determine the message builder to use
+            Builder builder;
+            if (contentType == null) {
+                log.debug("No content type specified. Using SOAP builder.");
+                builder = new SOAPBuilder();
+            } else {
+                int index = contentType.indexOf(';');
+                String type = index > 0 ? contentType.substring(0, index) : contentType;
+                builder = BuilderUtil.getBuilderFromSelector(type, msgContext);
+                if (builder == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("No message builder found for type '" + type +
+                                "'. Falling back to SOAP.");
+                    }
+                    builder = new SOAPBuilder();
+                }
+            }
 
             // set the message payload to the message context
-            InputStream in = content.getInputStream();
+            InputStream in;
+            ManagedDataSource dataSource;
+            if (builder instanceof DataSourceMessageBuilder && entry.isStreaming()) {
+                in = null;
+                dataSource = ManagedDataSourceFactory.create(
+                        new FileObjectDataSource(file, contentType));
+            } else {
+                in = content.getInputStream();
+                dataSource = null;
+            }
+            
             try {
-                SOAPEnvelope envelope;
-                try {
-                    envelope = TransportUtils.createSOAPMessage(msgContext, in, contentType);
-                } catch (XMLStreamException ex) {
-                    handleException("Error parsing XML", ex);
-                    return;
+                OMElement documentElement;
+                if (in != null) {
+                    documentElement = builder.processDocument(in, contentType, msgContext);
+                } else {
+                    documentElement = ((DataSourceMessageBuilder)builder).processDocument(
+                            dataSource, contentType, msgContext);
                 }
-                msgContext.setEnvelope(envelope);
+                msgContext.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
                 
                 handleIncomingMessage(
                     msgContext,
@@ -439,10 +471,14 @@
                 );
             }
             finally {
-                try {
-                    in.close();
-                } catch (IOException ex) {
-                    handleException("Error closing stream", ex);
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException ex) {
+                        handleException("Error closing stream", ex);
+                    }
+                } else {
+                    dataSource.destroy();
                 }
             }
 
@@ -504,6 +540,11 @@
                 entry.setMoveTimestampFormat(moveTimestampFormat);
             }
 
+            String strStreaming = ParamUtils.getOptionalParam(params, VFSConstants.STREAMING);
+            if (strStreaming != null) {
+                entry.setStreaming(Boolean.parseBoolean(strStreaming));
+            }
+            
             String strMaxRetryCount = ParamUtils.getOptionalParam(
                 params, VFSConstants.MAX_RETRY_COUNT);
             if(strMaxRetryCount != null)

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java?rev=751307&r1=751306&r2=751307&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportTest.java Sat Mar  7 19:03:11 2009
@@ -48,6 +48,8 @@
         // Since VFS has no Content-Type header, SwA is not supported.
         suite.addExclude("(test=AsyncSwA)");
         
+        suite.addExclude("(!(&(test=AsyncBinary)(endpoint=axis)))");
+        
         TransportDescriptionFactory tdf =
             new SimpleTransportDescriptionFactory("vfs", VFSTransportListener.class,
                     VFSTransportSender.class);