You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/28 17:26:28 UTC
svn commit: r798561 - in /cxf/trunk:
rt/core/src/main/java/org/apache/cxf/attachment/
rt/core/src/test/java/org/apache/cxf/attachment/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/
systests/src/test/java/org/apache/cxf/systest/jms/ systes...
Author: dkulp
Date: Tue Jul 28 15:26:25 2009
New Revision: 798561
URL: http://svn.apache.org/viewvc?rev=798561&view=rev
Log:
Update to really allow streaming of mtom attachments with jaxb
Also close the underlying stream
Modified:
cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java
cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java
cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java
cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDataSource.java Tue Jul 28 15:26:25 2009
@@ -31,17 +31,35 @@
public class AttachmentDataSource implements DataSource {
private final String ct;
- private final CachedOutputStream cache;
+ private CachedOutputStream cache;
+ private InputStream ins;
+ private DelegatingInputStream lastIns;
public AttachmentDataSource(String ctParam, InputStream inParam) throws IOException {
this.ct = ctParam;
- cache = new CachedOutputStream();
- IOUtils.copy(inParam, cache);
- cache.lockOutputStream();
+ ins = inParam;
}
+ public boolean isCached() {
+ return cache != null;
+ }
+
public void hold() {
- cache.holdTempFile();
+ try {
+ if (cache == null) {
+ cache = new CachedOutputStream();
+ IOUtils.copy(ins, cache);
+ cache.lockOutputStream();
+ cache.holdTempFile();
+ ins.close();
+ ins = null;
+ if (lastIns != null) {
+ lastIns.setInputStream(cache.getInputStream());
+ }
+ }
+ } catch (IOException e) {
+ //shouldn't happen
+ }
}
public void release() {
cache.releaseTempFileHold();
@@ -53,9 +71,14 @@
public InputStream getInputStream() {
try {
- return new DelegatingInputStream(cache.getInputStream());
+ if (cache != null) {
+ return cache.getInputStream();
+ }
+ if (ins instanceof DelegatingInputStream) {
+ lastIns = (DelegatingInputStream)ins;
+ }
+ return ins;
} catch (IOException e) {
- e.printStackTrace();
return null;
}
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/AttachmentDeserializer.java Tue Jul 28 15:26:25 2009
@@ -58,6 +58,8 @@
private int pbAmount = 2048;
private PushbackInputStream stream;
+ private int createCount;
+ private int closedCount;
private byte boundary[];
@@ -122,7 +124,9 @@
throw new RuntimeException(e);
}
- body = new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount));
+ body = new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount),
+ this);
+ createCount++;
message.setContent(InputStream.class, body);
}
}
@@ -199,13 +203,15 @@
&& !((DelegatingInputStream) body).isClosed()) {
cache((DelegatingInputStream) body, true);
- message.setContent(InputStream.class, body);
}
for (Attachment a : attachments.getLoadedAttachments()) {
DataSource s = a.getDataHandler().getDataSource();
- if (!(s instanceof AttachmentDataSource)) {
- //AttachementDataSource objects are already cached
+ if (s instanceof AttachmentDataSource) {
+ if (!((AttachmentDataSource)s).isCached()) {
+ cache((DelegatingInputStream) s.getInputStream(), false);
+ }
+ } else {
cache((DelegatingInputStream) s.getInputStream(), false);
}
}
@@ -279,7 +285,9 @@
*/
private Attachment createAttachment(InternetHeaders headers) throws IOException {
InputStream partStream =
- new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount));
+ new DelegatingInputStream(new MimeBodyPartInputStream(stream, boundary, pbAmount),
+ this);
+ createCount++;
return AttachmentUtil.createAttachment(partStream, headers);
}
@@ -290,4 +298,11 @@
public void setLazyLoading(boolean lazyLoading) {
this.lazyLoading = lazyLoading;
}
+
+ public void markClosed(DelegatingInputStream delegatingInputStream) throws IOException {
+ closedCount++;
+ if (closedCount == createCount && !attachments.hasNext()) {
+ stream.close();
+ }
+ }
}
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/DelegatingInputStream.java Tue Jul 28 15:26:25 2009
@@ -25,19 +25,24 @@
final class DelegatingInputStream extends InputStream {
private InputStream is;
+ private AttachmentDeserializer deserializer;
private boolean isClosed;
/**
* @param source
*/
- DelegatingInputStream(InputStream is) {
+ DelegatingInputStream(InputStream is, AttachmentDeserializer ads) {
this.is = is;
+ deserializer = ads;
}
@Override
public void close() throws IOException {
is.close();
isClosed = true;
+ if (!isClosed) {
+ deserializer.markClosed(this);
+ }
}
public boolean isClosed() {
Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java (original)
+++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/attachment/LazyAttachmentCollection.java Tue Jul 28 15:26:25 2009
@@ -58,6 +58,15 @@
throw new RuntimeException(e);
}
}
+ public boolean hasNext() throws IOException {
+ Attachment a = deserializer.readNext();
+ if (a != null) {
+ attachments.add(a);
+ return true;
+ }
+ return false;
+ }
+
public Iterator<Attachment> iterator() {
return new Iterator<Attachment>() {
Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java (original)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/attachment/AttachmentDeserializerTest.java Tue Jul 28 15:26:25 2009
@@ -20,7 +20,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.Collection;
@@ -76,11 +75,6 @@
InputStream attIs = a.getDataHandler().getInputStream();
- // We need to cache the InputStream for reusing the AttachmentDataSource
- //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
- assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
- assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-
// check the cached output stream
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(attBody, out);
@@ -125,11 +119,6 @@
InputStream attIs = a.getDataHandler().getInputStream();
- // We need to cache the InputStream for reusing the AttachmentDataSource
- //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
- assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
- assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-
// check the cached output stream
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(attBody, out);
@@ -174,12 +163,6 @@
InputStream attIs = a.getDataHandler().getInputStream();
- // We need to cache the InputStream for reusing the AttachmentDataSource
- //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
- assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
-
- assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-
// check the cached output stream
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(attBody, out);
@@ -222,11 +205,6 @@
InputStream attIs = a.getDataHandler().getInputStream();
- // We need to cache the InputStream for reusing the AttachmentDataSource
- //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
- assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
- assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof ByteArrayInputStream);
-
// check the cached output stream
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(attBody, out);
@@ -274,11 +252,12 @@
InputStream attIs = a.getDataHandler().getInputStream();
- // We need to cache the InputStream for reusing the AttachmentDataSource
- //assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof MimeBodyPartInputStream);
- assertTrue(((DelegatingInputStream) attIs).getInputStream() instanceof ByteArrayInputStream);
- assertTrue(((DelegatingInputStream) attBody).getInputStream() instanceof FileInputStream);
+ assertFalse(itr.hasNext());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ IOUtils.copy(attIs, out);
+ assertTrue(out.size() > 1000);
+
}
Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/utils/JAXRSUtils.java Tue Jul 28 15:26:25 2009
@@ -100,6 +100,7 @@
private static final Logger LOG = LogUtils.getL7dLogger(JAXRSUtils.class);
private static final ResourceBundle BUNDLE = BundleUtils.getBundle(JAXRSUtils.class);
private static final String PROPOGATE_EXCEPTION = "org.apache.cxf.propogate.exception";
+ private static final String FORM_PARAM_MAP = JAXRSUtils.class.getName() + ".FORM_DATA";
private JAXRSUtils() {
}
@@ -592,19 +593,25 @@
MessageContext mc = new MessageContextImpl(m);
MediaType mt = mc.getHttpHeaders().getMediaType();
- MultivaluedMap<String, String> params = new MetadataMap<String, String>();
+ @SuppressWarnings("unchecked")
+ MultivaluedMap<String, String> params = (MultivaluedMap<String, String>)m.get(FORM_PARAM_MAP);
- if (mt == null || mt.isCompatible(MediaType.APPLICATION_FORM_URLENCODED_TYPE)) {
- String body = (String)m.get("org.apache.cxf.jaxrs.provider.form.body");
- if (body == null) {
- body = FormUtils.readBody(m.getContent(InputStream.class));
- m.put("org.apache.cxf.jaxrs.provider.form.body", body);
+ if (params == null) {
+ params = new MetadataMap<String, String>();
+ m.put(FORM_PARAM_MAP, params);
+
+ if (mt == null || mt.isCompatible(MediaType.APPLICATION_FORM_URLENCODED_TYPE)) {
+ String body = (String)m.get("org.apache.cxf.jaxrs.provider.form.body");
+ if (body == null) {
+ body = FormUtils.readBody(m.getContent(InputStream.class));
+ m.put("org.apache.cxf.jaxrs.provider.form.body", body);
+ }
+ HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
+ FormUtils.populateMapFromString(params, (String)body, decode, request);
+ } else {
+ MultipartBody body = AttachmentUtils.getMultipartBody(mc);
+ FormUtils.populateMapFromMultipart(params, body, decode);
}
- HttpServletRequest request = (HttpServletRequest)m.get(AbstractHTTPDestination.HTTP_REQUEST);
- FormUtils.populateMapFromString(params, (String)body, decode, request);
- } else {
- MultipartBody body = AttachmentUtils.getMultipartBody(mc);
- FormUtils.populateMapFromMultipart(params, body, decode);
}
if ("".equals(key)) {
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java Tue Jul 28 15:26:25 2009
@@ -54,6 +54,7 @@
import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDDynamicPrefix;
import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDStaticPrefix;
import org.apache.cxf.hello_world_jms.NoSuchCodeLitFault;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.jms_greeter.JMSGreeterPortType;
import org.apache.cxf.jms_greeter.JMSGreeterService;
@@ -909,8 +910,9 @@
handler1.value = new DataHandler(fileURL);
int size = handler1.value.getInputStream().available();
mtom.testDataHandler(name, handler1);
- int size2 = handler1.value.getInputStream().available();
- assertTrue("The response file is not same with the sent file.", size == size2);
+
+ byte bytes[] = IOUtils.readBytesFromStream(handler1.value.getInputStream());
+ assertEquals("The response file is not same with the sent file.", size, bytes.length);
}
@Test
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java?rev=798561&r1=798560&r2=798561&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/mtom/ClientMtomXopTest.java Tue Jul 28 15:26:25 2009
@@ -30,11 +30,11 @@
import javax.xml.ws.Holder;
import javax.xml.ws.soap.SOAPBinding;
-import org.apache.axiom.attachments.utils.IOUtils;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.ClientImpl;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsClientProxy;
@@ -89,7 +89,6 @@
((BindingProvider)mtomPort).getRequestContext().put("schema-validation-enabled",
Boolean.TRUE);
-
param.value = new DataHandler(new ByteArrayDataSource(data, "application/octet-stream"));
Holder<String> name = new Holder<String>("call detail");
mtomPort.testXop(name, param);
@@ -97,7 +96,7 @@
assertNotNull(param.value);
InputStream in = param.value.getInputStream();
- byte bytes[] = IOUtils.getStreamAsByteArray(in);
+ byte bytes[] = IOUtils.readBytesFromStream(in);
assertEquals(data.length, bytes.length);
in.close();
@@ -108,10 +107,9 @@
assertNotNull(param.value);
in = param.value.getInputStream();
- bytes = IOUtils.getStreamAsByteArray(in);
+ bytes = IOUtils.readBytesFromStream(in);
assertEquals(data.length, bytes.length);
in.close();
-
} catch (UndeclaredThrowableException ex) {
throw (Exception)ex.getCause();
}