You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/09 10:33:57 UTC
[2/2] camel git commit: CAMEL-8410 Add stream caching for CxfPayload
with thanks to Stephan
CAMEL-8410 Add stream caching for CxfPayload with thanks to Stephan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d94b2a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d94b2a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d94b2a8
Branch: refs/heads/master
Commit: 5d94b2a8f748438229b786ef8b4d132620e26a21
Parents: ea70233
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 9 14:21:49 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 9 16:31:00 2015 +0800
----------------------------------------------------------------------
.../apache/camel/component/cxf/CxfPayload.java | 6 +-
.../cxf/converter/CachedCxfPayload.java | 371 +++++++++++++++++++
.../cxf/converter/CxfPayloadConverter.java | 32 +-
.../cxf/converter/CachedCxfPayloadTest.java | 103 +++++
.../CxfPayLoadStreamCacheRouterTest.java | 50 +++
.../cxf/converter/CxfPayloadConverterTest.java | 11 +-
.../cxf/converter/PayLoadConvertToPOJOTest.java | 2 +-
7 files changed, 547 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
index abdf682..40c4be2 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
@@ -139,7 +139,11 @@ public class CxfPayload<T> {
public List<T> getHeaders() {
return headers;
}
-
+
+ public Map<String, String> getNsMap() {
+ return nsMap;
+ }
+
public String toString() {
// do not load or print the payload body etc as we do not want to load that into memory etc
return super.toString();
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
new file mode 100644
index 0000000..d54a026
--- /dev/null
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ListIterator;
+import java.util.Map;
+
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.namespace.QName;
+import javax.xml.stream.Location;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
+import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
+import org.apache.camel.converter.stream.StreamSourceCache;
+import org.apache.cxf.staxutils.StaxSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
+ private static final Logger LOG = LoggerFactory.getLogger(CachedCxfPayload.class);
+ private final XmlConverter xml;
+
+ public CachedCxfPayload(CxfPayload<T> orig, Exchange exchange, XmlConverter xml) {
+ super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
+ ListIterator<Source> li = getBodySources().listIterator();
+ this.xml = xml;
+ while (li.hasNext()) {
+ Source source = li.next();
+ XMLStreamReader reader = null;
+ // namespace definitions that are on the SOAP envelope can get lost, if this is
+ // not a DOM (there is special coding on the CXFPayload.getBody().get() method for
+ // this, that only works on DOM nodes.
+ // We have to do some delegation on the XMLStreamReader for StAXSource and StaxSource
+ // that re-injects the missing namespaces into the XMLStreamReader.
+ // Replace all other Sources that are not DOMSources with DOMSources.
+ if (source instanceof StaxSource) {
+ reader = ((StaxSource) source).getXMLStreamReader();
+ } else if (source instanceof StAXSource) {
+ reader = ((StAXSource) source).getXMLStreamReader();
+ }
+ if (reader != null) {
+ Map<String, String> nsmap = getNsMap();
+ if (nsmap == null) {
+ nsmap = Collections.emptyMap();
+ }
+ source = new StAXSource(new DelegatingXMLStreamReader(reader, nsmap));
+ StreamSource streamSource = exchange.getContext().getTypeConverter().convertTo(StreamSource.class, exchange, source);
+ if (streamSource != null) {
+ try {
+ li.set(new StreamSourceCache(streamSource, exchange));
+ } catch (IOException e) {
+ LOG.error("Cannot Create StreamSourceCache ", e);
+ }
+ }
+ } else if (!(source instanceof DOMSource)) {
+ Document document = exchange.getContext().getTypeConverter().convertTo(Document.class, exchange, source);
+ if (document != null) {
+ li.set(new DOMSource(document));
+ }
+ }
+ }
+ }
+
+ private CachedCxfPayload(CachedCxfPayload<T> orig) throws IOException {
+ super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
+ ListIterator<Source> li = getBodySources().listIterator();
+ this.xml = orig.xml;
+ while (li.hasNext()) {
+ Source source = li.next();
+ if (source instanceof StreamCache) {
+ li.set((Source) (((StreamCache) source)).copy());
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+ for (Source source : getBodySources()) {
+ if (source instanceof StreamCache) {
+ ((StreamCache) source).reset();
+ }
+ }
+ }
+
+ @Override
+ public void writeTo(OutputStream os) throws IOException {
+ // no body no write
+ if (getBodySources().size() == 0) {
+ return;
+ }
+ Source body = getBodySources().get(0);
+ if (body instanceof StreamCache) {
+ ((StreamCache) body).writeTo(os);
+ } else {
+ StreamResult sr = new StreamResult(os);
+ try {
+ xml.toResult(body, sr);
+ } catch (TransformerException e) {
+ throw new IOException("Transformation failed", e);
+ }
+ }
+ }
+
+ @Override
+ public boolean inMemory() {
+ boolean inMemory = true;
+ for (Source source : getBodySources()) {
+ if (source instanceof StreamCache && !((StreamCache) source).inMemory()) {
+ inMemory = false;
+ }
+ }
+ return inMemory;
+ }
+
+ @Override
+ public long length() {
+ return 0;
+ }
+
+ @Override
+ public StreamCache copy() throws IOException {
+ return new CachedCxfPayload<T>(this);
+ }
+
+ private static class DelegatingXMLStreamReader implements XMLStreamReader {
+ private XMLStreamReader reader;
+ private final Map<String, String> nsmap;
+ private final String[] prefixes;
+
+ public DelegatingXMLStreamReader(XMLStreamReader reader, Map<String, String> nsmap) {
+ this.reader = reader;
+ this.nsmap = nsmap;
+ this.prefixes = nsmap.keySet().toArray(new String[0]);
+ }
+
+ @Override
+ public Object getProperty(String name) throws IllegalArgumentException {
+ return reader.getProperty(name);
+ }
+
+ @Override
+ public int next() throws XMLStreamException {
+ return reader.next();
+ }
+
+ @Override
+ public void require(int type, String namespaceURI, String localName) throws XMLStreamException {
+ reader.require(type, namespaceURI, localName);
+ }
+
+ @Override
+ public String getElementText() throws XMLStreamException {
+ return reader.getElementText();
+ }
+
+ @Override
+ public int nextTag() throws XMLStreamException {
+ return reader.nextTag();
+ }
+
+ @Override
+ public boolean hasNext() throws XMLStreamException {
+ return reader.hasNext();
+ }
+
+ @Override
+ public void close() throws XMLStreamException {
+ reader.close();
+ }
+
+ @Override
+ public String getNamespaceURI(String prefix) {
+ String nsuri = reader.getNamespaceURI();
+ if (nsuri == null) {
+ nsuri = nsmap.get(prefix);
+ }
+ return nsuri;
+ }
+
+ @Override
+ public boolean isStartElement() {
+ return reader.isStartElement();
+ }
+
+ @Override
+ public boolean isEndElement() {
+ return reader.isEndElement();
+ }
+
+ @Override
+ public boolean isCharacters() {
+ return reader.isCharacters();
+ }
+
+ public boolean isWhiteSpace() {
+ return reader.isWhiteSpace();
+ }
+
+ public String getAttributeValue(String namespaceURI, String localName) {
+ return reader.getAttributeValue(namespaceURI, localName);
+ }
+
+ public int getAttributeCount() {
+ return reader.getAttributeCount();
+ }
+
+ public QName getAttributeName(int index) {
+ return reader.getAttributeName(index);
+ }
+
+ public String getAttributeNamespace(int index) {
+ return reader.getAttributeNamespace(index);
+ }
+
+ public String getAttributeLocalName(int index) {
+ return reader.getAttributeLocalName(index);
+ }
+
+ public String getAttributePrefix(int index) {
+ return reader.getAttributePrefix(index);
+ }
+
+ public String getAttributeType(int index) {
+ return reader.getAttributeType(index);
+ }
+
+ public String getAttributeValue(int index) {
+ return reader.getAttributeValue(index);
+ }
+
+ public boolean isAttributeSpecified(int index) {
+ return reader.isAttributeSpecified(index);
+ }
+
+ public int getNamespaceCount() {
+ return prefixes.length + reader.getNamespaceCount();
+ }
+
+ public String getNamespacePrefix(int index) {
+ if (index < prefixes.length) {
+ return prefixes[index];
+ } else {
+ return reader.getNamespacePrefix(index - prefixes.length);
+ }
+ }
+
+ public String getNamespaceURI(int index) {
+ if (index < prefixes.length) {
+ return nsmap.get(prefixes[index]);
+ } else {
+ return reader.getNamespaceURI(index - prefixes.length);
+ }
+ }
+
+ public NamespaceContext getNamespaceContext() {
+ return reader.getNamespaceContext();
+ }
+
+ public int getEventType() {
+ return reader.getEventType();
+ }
+
+ public String getText() {
+ return reader.getText();
+ }
+
+ public char[] getTextCharacters() {
+ return reader.getTextCharacters();
+ }
+
+ public int getTextCharacters(int sourceStart, char[] target, int targetStart, int length) throws XMLStreamException {
+ return reader.getTextCharacters(sourceStart, target, targetStart, length);
+ }
+
+ public int getTextStart() {
+ return reader.getTextStart();
+ }
+
+ public int getTextLength() {
+ return reader.getTextLength();
+ }
+
+ public String getEncoding() {
+ return reader.getEncoding();
+ }
+
+ public boolean hasText() {
+ return reader.hasText();
+ }
+
+ public Location getLocation() {
+ return reader.getLocation();
+ }
+
+ public QName getName() {
+ return reader.getName();
+ }
+
+ public String getLocalName() {
+ return reader.getLocalName();
+ }
+
+ public boolean hasName() {
+ return reader.hasName();
+ }
+
+ public String getNamespaceURI() {
+ return reader.getNamespaceURI();
+ }
+
+ public String getPrefix() {
+ return reader.getPrefix();
+ }
+
+ public String getVersion() {
+ return reader.getVersion();
+ }
+
+ public boolean isStandalone() {
+ return reader.isStandalone();
+ }
+
+ public boolean standaloneSet() {
+ return reader.standaloneSet();
+ }
+
+ public String getCharacterEncodingScheme() {
+ return reader.getCharacterEncodingScheme();
+ }
+
+ public String getPITarget() {
+ return reader.getPITarget();
+ }
+
+ public String getPIData() {
+ return reader.getPIData();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
index ac0713e..7b8da00 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
@@ -25,24 +25,25 @@ import java.util.List;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Source;
import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamSource;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.FallbackConverter;
+import org.apache.camel.StreamCache;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
import org.apache.camel.spi.TypeConverterRegistry;
import org.apache.cxf.staxutils.StaxUtils;
@Converter
public final class CxfPayloadConverter {
+ private static XmlConverter xml = new XmlConverter();
private CxfPayloadConverter() {
// Helper class
@@ -108,6 +109,11 @@ public final class CxfPayloadConverter {
return null;
}
+ @Converter
+ public static <T> StreamCache cxfPayLoadToStreamCache(CxfPayload<T> payload, Exchange exchange) {
+ return new CachedCxfPayload<T>(payload, exchange, xml);
+ }
+
@SuppressWarnings("unchecked")
@FallbackConverter
public static <T> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
@@ -179,7 +185,6 @@ public final class CxfPayloadConverter {
} catch (XMLStreamException e) {
throw new RuntimeException(e);
}
- payload.getBodySources().set(0, new DOMSource(d.getDocumentElement()));
return type.cast(d);
}
// CAMEL-8410 Just make sure we get the Source object directly from the payload body source
@@ -189,28 +194,7 @@ public final class CxfPayloadConverter {
}
TypeConverter tc = registry.lookup(type, Source.class);
if (tc != null) {
- if ((s instanceof StreamSource
- || s instanceof SAXSource)
- && !type.isAssignableFrom(Document.class)
- && !type.isAssignableFrom(Source.class)) {
- //non-reproducible sources, we need to convert to DOMSource first
- //or the payload will get wiped out
- Document d;
- try {
- d = StaxUtils.read(s);
- } catch (XMLStreamException e) {
- throw new RuntimeException(e);
- }
- s = new DOMSource(d.getDocumentElement());
- payload.getBodySources().set(0, s);
- }
-
T t = tc.convertTo(type, s);
- if (t instanceof Document) {
- payload.getBodySources().set(0, new DOMSource(((Document)t).getDocumentElement()));
- } else if (t instanceof Source) {
- payload.getBodySources().set(0, (Source)t);
- }
return t;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
new file mode 100644
index 0000000..6c7618b
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.sax.SAXSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
+import org.apache.camel.test.junit4.ExchangeTestSupport;
+import org.apache.cxf.staxutils.StaxSource;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.junit.Test;
+
+public class CachedCxfPayloadTest extends ExchangeTestSupport {
+ private static final String PAYLOAD = "<foo>bar</foo>";
+
+ @Test
+ public void testCachedCxfPayloadSAXSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+ SAXSource source = context.getTypeConverter().mandatoryConvertTo(SAXSource.class, PAYLOAD);
+ doTest(source);
+ }
+
+ @Test
+ public void testCachedCxfPayloadStAXSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+ StAXSource source = context.getTypeConverter().mandatoryConvertTo(StAXSource.class, PAYLOAD);
+ doTest(source);
+ }
+
+ @Test
+ public void testCachedCxfPayloadStaxSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+ XMLStreamReader streamReader = StaxUtils.createXMLStreamReader(new StreamSource(new StringReader(PAYLOAD)));
+ StaxSource source = new StaxSource(streamReader);
+ doTest(source);
+ }
+
+ @Test
+ public void testCachedCxfPayloadDOMSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+ DOMSource source = context.getTypeConverter().mandatoryConvertTo(DOMSource.class, PAYLOAD);
+ doTest(source);
+ }
+
+ @Test
+ public void testCachedCxfPayloadStreamSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+ StreamSource source = context.getTypeConverter().mandatoryConvertTo(StreamSource.class, PAYLOAD);
+ doTest(source);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void doTest(Object source) throws IOException {
+ CxfPayload<?> originalPayload = context.getTypeConverter().convertTo(CxfPayload.class, source);
+ CachedCxfPayload<?> cache = new CachedCxfPayload(originalPayload, exchange, new XmlConverter());
+
+ assertTrue(cache.inMemory());
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ cache.writeTo(bos);
+
+ String s = context.getTypeConverter().convertTo(String.class, bos);
+ assertEquals(PAYLOAD, s);
+
+ cache.reset();
+
+ CachedCxfPayload clone = (CachedCxfPayload) cache.copy();
+ bos = new ByteArrayOutputStream();
+ clone.writeTo(bos);
+
+ s = context.getTypeConverter().convertTo(String.class, bos);
+ assertEquals(PAYLOAD, s);
+
+ cache.reset();
+ clone.reset();
+
+ s = context.getTypeConverter().convertTo(String.class, cache);
+ assertEquals(PAYLOAD, s);
+
+ s = context.getTypeConverter().convertTo(String.class, clone);
+ assertEquals(PAYLOAD, s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
new file mode 100644
index 0000000..1262ca9
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.StreamCache;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CxfSimpleRouterTest;
+
+/**
+ * A unit test for testing reading SOAP body in PAYLOAD mode.
+ *
+ * @version
+ */
+public class CxfPayLoadStreamCacheRouterTest extends CxfSimpleRouterTest {
+
+ private String routerEndpointURI = "cxf://" + getRouterAddress() + "?" + SERVICE_CLASS + "&dataFormat=PAYLOAD";
+ private String serviceEndpointURI = "cxf://" + getServiceAddress() + "?" + SERVICE_CLASS + "&dataFormat=PAYLOAD";
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // START SNIPPET: payload
+ from(routerEndpointURI).streamCaching().process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ Object payload = exchange.getIn().getBody();
+ assertTrue("payload is not a StreamCache", payload instanceof StreamCache);
+ }
+ })
+ .to(serviceEndpointURI);
+ // END SNIPPET: payload
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
index b969073..17ffc16 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
@@ -31,7 +31,7 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-
+import org.apache.camel.StreamCache;
import org.apache.camel.component.cxf.CxfPayload;
import org.apache.camel.test.junit4.ExchangeTestSupport;
import org.junit.Before;
@@ -86,7 +86,14 @@ public class CxfPayloadConverterTest extends ExchangeTestSupport {
assertNotNull(nodeList);
assertEquals("Get a worng size of nodeList", 1, nodeList.getLength());
}
-
+
+ @Test
+ public void testCxfPayloadToStreamCache() {
+ StreamCache streamCache = CxfPayloadConverter.cxfPayLoadToStreamCache(payload, exchange);
+ assertNotNull(streamCache);
+ assertTrue(streamCache instanceof CxfPayload);
+ }
+
@Test
public void testToCxfPayload() {
// use default type converter
http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
index c62c7d5..d7b1b08 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
@@ -88,7 +88,7 @@ public class PayLoadConvertToPOJOTest extends CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("cxf:bean:routerEndpoint?dataFormat=PAYLOAD").process(new Processor() {
+ from("cxf:bean:routerEndpoint?dataFormat=PAYLOAD").streamCaching().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {