You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/28 17:26:28 UTC

svn commit: r798561 - in /cxf/trunk: rt/core/src/main/java/org/apache/cxf/attachment/ rt/core/src/test/java/org/apache/cxf/attachment/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/ systests/src/test/java/org/apache/cxf/systest/jms/ systes...

Author: dkulp
Date: Tue Jul 28 15:26:25 2009
New Revision: 798561

URL: http://svn.apache.org/viewvc?rev=798561&view=rev
Log:
Update to really allow streaming of mtom attachments with jaxb
  Also close the underlying stream

Modified:
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java
    cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java
    cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java Tue Jul 28 15:26:25 2009
@@ -31,17 +31,35 @@
 public class AttachmentDataSource implements DataSource {
 
     private final String ct;    
-    private final CachedOutputStream cache;
+    private CachedOutputStream cache;
+    private InputStream ins;
+    private DelegatingInputStream lastIns;
     
     public AttachmentDataSource(String ctParam, InputStream inParam) throws IOException {
         this.ct = ctParam;        
-        cache = new CachedOutputStream();
-        IOUtils.copy(inParam, cache);
-        cache.lockOutputStream();
+        ins = inParam;
     }
 
+    public boolean isCached() {
+        return cache != null;
+    }
+    
     public void hold() {
-        cache.holdTempFile();
+        try {
+            if (cache == null) {
+                cache = new CachedOutputStream();
+                IOUtils.copy(ins, cache);
+                cache.lockOutputStream();
+                cache.holdTempFile();
+                ins.close();
+                ins = null;
+                if (lastIns != null) {
+                    lastIns.setInputStream(cache.getInputStream());
+                }
+            }
+        } catch (IOException e) {
+            //shouldn't happen
+        }
     }
     public void release() {
         cache.releaseTempFileHold();
@@ -53,9 +71,14 @@
 
     public InputStream getInputStream() {
         try {
-            return new DelegatingInputStream(cache.getInputStream());
+            if (cache != null) {
+                return cache.getInputStream();
+            }
+            if (ins instanceof DelegatingInputStream) {
+                lastIns = (DelegatingInputStream)ins;
+            }
+            return ins;
         } catch (IOException e) {
-            e.printStackTrace();
             return null;
         }
     }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java Tue Jul 28 15:26:25 2009
@@ -58,6 +58,8 @@
 
     private int pbAmount = 2048;
     private PushbackInputStream stream;
+    private int createCount; 
+    private int closedCount;
 
     private byte boundary[];
 
@@ -122,7 +124,9 @@
                 throw new RuntimeException(e);
             }
 
-            body = new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount));
+            body = new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount),
+                                             this);
+            createCount++;
             message.setContent(InputStream.class, body);
         }
     }
@@ -199,13 +203,15 @@
             && !((DelegatingInputStream) body).isClosed()) {
 
             cache((DelegatingInputStream) body, true);
-            message.setContent(InputStream.class, body);
         }
 
         for (Attachment a : attachments.getLoadedAttachments()) {
             DataSource s = a.getDataHandler().getDataSource();
-            if (!(s instanceof AttachmentDataSource)) {
-                //AttachementDataSource objects are already cached
+            if (s instanceof AttachmentDataSource) {
+                if (!((AttachmentDataSource)s).isCached()) {
+                    cache((DelegatingInputStream) s.getInputStream(), false);
+                }
+            } else {
                 cache((DelegatingInputStream) s.getInputStream(), false);
             }
         }
@@ -279,7 +285,9 @@
      */
     private Attachment createAttachment(InternetHeaders headers) throws IOException {
         InputStream partStream = 
-            new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount));
+            new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount),
+                                      this);
+        createCount++;
         return AttachmentUtil.createAttachment(partStream, headers);
     }
 
@@ -290,4 +298,11 @@
     public void setLazyLoading(boolean lazyLoading) {
         this.lazyLoading = lazyLoading;
     }
+
+    public void markClosed(DelegatingInputStream delegatingInputStream) throws IOException {
+        closedCount++;
+        if (closedCount == createCount && !attachments.hasNext()) {
+            stream.close();
+        }
+    }
 }

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java Tue Jul 28 15:26:25 2009
@@ -25,19 +25,24 @@
 
 final class DelegatingInputStream extends InputStream {
     private InputStream is;
+    private AttachmentDeserializer deserializer;
     private boolean isClosed;
 
     /**
      * @param source
      */
-    DelegatingInputStream(InputStream is) {
+    DelegatingInputStream(InputStream is, AttachmentDeserializer ads) {
         this.is = is;
+        deserializer = ads;
     }
 
     @Override
     public void close() throws IOException {
         is.close();
         isClosed = true;
+        if (!isClosed) {
+            deserializer.markClosed(this);
+        }
     }
 
     public boolean isClosed() {

Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java Tue Jul 28 15:26:25 2009
@@ -58,6 +58,15 @@
             throw new RuntimeException(e);
         }
     }
+    public boolean hasNext() throws IOException {
+        Attachment a = deserializer.readNext();
+        if (a != null) {
+            attachments.add(a);
+            return true;
+        }
+        return false;
+    }
+
 
     public Iterator<Attachment> iterator() {
         return new Iterator<Attachment>() {

Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java (original)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java Tue Jul 28 15:26:25 2009
@@ -20,7 +20,6 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
 import java.util.Collection;
@@ -76,11 +75,6 @@
         
         InputStream attIs = a.getDataHandler().getInputStream();
         
-        // We need to cache the InputStream for reusing the AttachmentDataSource
-        //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
-        assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-        assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-        
         // check the cached output stream
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         IOUtils.copy(attBody, out);
@@ -125,11 +119,6 @@
 
         InputStream attIs = a.getDataHandler().getInputStream();
 
-        // We need to cache the InputStream for reusing the AttachmentDataSource
-        //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
-        assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-        assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-
         // check the cached output stream
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         IOUtils.copy(attBody, out);
@@ -174,12 +163,6 @@
         
         InputStream attIs = a.getDataHandler().getInputStream();
 
-        // We need to cache the InputStream for reusing the AttachmentDataSource
-        //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
-        assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-
-        assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-        
         // check the cached output stream
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         IOUtils.copy(attBody, out);
@@ -222,11 +205,6 @@
         
         InputStream attIs = a.getDataHandler().getInputStream();
         
-        // We need to cache the InputStream for reusing the AttachmentDataSource
-        //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
-        assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-        assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-        
         // check the cached output stream
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         IOUtils.copy(attBody, out);
@@ -274,11 +252,12 @@
         
         InputStream attIs = a.getDataHandler().getInputStream();
         
-        // We need to cache the InputStream for reusing the AttachmentDataSource
-        //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
-        assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-        assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof FileInputStream);
+        assertFalse(itr.hasNext());
         
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        IOUtils.copy(attIs, out);
+        assertTrue(out.size() > 1000);
+
     }
     
     

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java Tue Jul 28 15:26:25 2009
@@ -100,6 +100,7 @@
     private static final Logger LOG = LogUtils.getL7dLogger(JAXRSUtils.class);
     private static final ResourceBundle BUNDLE = BundleUtils.getBundle(JAXRSUtils.class);
     private static final String PROPOGATE_EXCEPTION = "org.apache.cxf.propogate.exception";
+    private static final String FORM_PARAM_MAP = JAXRSUtils.class.getName() + ".FORM_DATA";
 
     private JAXRSUtils() {        
     }
@@ -592,19 +593,25 @@
         MessageContext mc = new MessageContextImpl(m);
         MediaType mt = mc.getHttpHeaders().getMediaType();
         
-        MultivaluedMap<String, String> params = new MetadataMap<String, String>();
+        @SuppressWarnings("unchecked")
+        MultivaluedMap<String, String> params = (MultivaluedMap<String, String>)m.get(FORM_PARAM_MAP); 
         
-        if (mt == null || mt.isCompatible(MediaType.APPLICATION_FORM_URLENCODED_TYPE)) {
-            String body = (String)m.get("org.apache.cxf.jaxrs.provider.form.body");
-            if (body == null) {
-                body = FormUtils.readBody(m.getContent(InputStream.class));
-                m.put("org.apache.cxf.jaxrs.provider.form.body", body);
+        if (params == null) {
+            params = new MetadataMap<String, String>();
+            m.put(FORM_PARAM_MAP, params);
+        
+            if (mt == null || mt.isCompatible(MediaType.APPLICATION_FORM_URLENCODED_TYPE)) {
+                String body = (String)m.get("org.apache.cxf.jaxrs.provider.form.body");
+                if (body == null) {
+                    body = FormUtils.readBody(m.getContent(InputStream.class));
+                    m.put("org.apache.cxf.jaxrs.provider.form.body", body);
+                }
+                HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
+                FormUtils.populateMapFromString(params, (String)body, decode, request);
+            } else {
+                MultipartBody body = AttachmentUtils.getMultipartBody(mc);
+                FormUtils.populateMapFromMultipart(params, body, decode);
             }
-            HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
-            FormUtils.populateMapFromString(params, (String)body, decode, request);
-        } else {
-            MultipartBody body = AttachmentUtils.getMultipartBody(mc);
-            FormUtils.populateMapFromMultipart(params, body, decode);
         }
         
         if ("".equals(key)) {

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java Tue Jul 28 15:26:25 2009
@@ -54,6 +54,7 @@
 import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDDynamicPrefix;
 import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDStaticPrefix;
 import org.apache.cxf.hello_world_jms.NoSuchCodeLitFault;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.jms_greeter.JMSGreeterPortType;
 import org.apache.cxf.jms_greeter.JMSGreeterService;
@@ -909,8 +910,9 @@
         handler1.value = new DataHandler(fileURL);
         int size = handler1.value.getInputStream().available();
         mtom.testDataHandler(name, handler1);
-        int size2 = handler1.value.getInputStream().available();
-        assertTrue("The response file is not same with the sent file.", size == size2);
+        
+        byte bytes[] = IOUtils.readBytesFromStream(handler1.value.getInputStream());
+        assertEquals("The response file is not same with the sent file.", size, bytes.length);
     }
     
     @Test

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java Tue Jul 28 15:26:25 2009
@@ -30,11 +30,11 @@
 import javax.xml.ws.Holder;
 import javax.xml.ws.soap.SOAPBinding;
 
-import org.apache.axiom.attachments.utils.IOUtils;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.ClientImpl;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.LoggingInInterceptor;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
 import org.apache.cxf.jaxws.JaxWsClientProxy;
@@ -89,7 +89,6 @@
             
             ((BindingProvider)mtomPort).getRequestContext().put("schema-validation-enabled",
                                                                 Boolean.TRUE);
-            
             param.value = new DataHandler(new ByteArrayDataSource(data, "application/octet-stream"));
             Holder<String> name = new Holder<String>("call detail");
             mtomPort.testXop(name, param);
@@ -97,7 +96,7 @@
             assertNotNull(param.value);
             
             InputStream in = param.value.getInputStream();
-            byte bytes[] = IOUtils.getStreamAsByteArray(in);
+            byte bytes[] = IOUtils.readBytesFromStream(in);
             assertEquals(data.length, bytes.length);
             in.close();
 
@@ -108,10 +107,9 @@
             assertNotNull(param.value);
             
             in = param.value.getInputStream();
-            bytes = IOUtils.getStreamAsByteArray(in);
+            bytes = IOUtils.readBytesFromStream(in);
             assertEquals(data.length, bytes.length);
             in.close();
-
         } catch (UndeclaredThrowableException ex) {
             throw (Exception)ex.getCause();
         }