You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/04/09 10:33:57 UTC

[2/2] camel git commit: CAMEL-8410 Add stream caching for CxfPayload with thanks to Stephan

CAMEL-8410 Add stream caching for CxfPayload with thanks to Stephan


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5d94b2a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5d94b2a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5d94b2a8

Branch: refs/heads/master
Commit: 5d94b2a8f748438229b786ef8b4d132620e26a21
Parents: ea70233
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Apr 9 14:21:49 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Apr 9 16:31:00 2015 +0800

----------------------------------------------------------------------
 .../apache/camel/component/cxf/CxfPayload.java  |   6 +-
 .../cxf/converter/CachedCxfPayload.java         | 371 +++++++++++++++++++
 .../cxf/converter/CxfPayloadConverter.java      |  32 +-
 .../cxf/converter/CachedCxfPayloadTest.java     | 103 +++++
 .../CxfPayLoadStreamCacheRouterTest.java        |  50 +++
 .../cxf/converter/CxfPayloadConverterTest.java  |  11 +-
 .../cxf/converter/PayLoadConvertToPOJOTest.java |   2 +-
 7 files changed, 547 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
index abdf682..40c4be2 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfPayload.java
@@ -139,7 +139,11 @@ public class CxfPayload<T> {
     public List<T> getHeaders() {
         return headers;
     }
-    
+
+    public Map<String, String> getNsMap() {
+        return nsMap;
+    }
+
     public String toString() {
         // do not load or print the payload body etc as we do not want to load that into memory etc
         return super.toString();

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
new file mode 100644
index 0000000..d54a026
--- /dev/null
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CachedCxfPayload.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ListIterator;
+import java.util.Map;
+
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.namespace.QName;
+import javax.xml.stream.Location;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
+import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
+import org.apache.camel.converter.stream.StreamSourceCache;
+import org.apache.cxf.staxutils.StaxSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CachedCxfPayload<T> extends CxfPayload<T> implements StreamCache {
+    private static final Logger LOG = LoggerFactory.getLogger(CachedCxfPayload.class);
+    private final XmlConverter xml;
+
+    public CachedCxfPayload(CxfPayload<T> orig, Exchange exchange, XmlConverter xml) {
+        super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
+        ListIterator<Source> li = getBodySources().listIterator();
+        this.xml = xml;
+        while (li.hasNext()) {
+            Source source = li.next();
+            XMLStreamReader reader = null;
+            // namespace definitions that are on the SOAP envelope can get lost, if this is 
+            // not a DOM (there is special coding on the CXFPayload.getBody().get() method for 
+            // this, that only works on DOM nodes.
+            // We have to do some delegation on the XMLStreamReader for StAXSource and StaxSource
+            // that re-injects the missing namespaces into the XMLStreamReader.
+            // Replace all other Sources that are not DOMSources with DOMSources.
+            if (source instanceof StaxSource) {
+                reader = ((StaxSource) source).getXMLStreamReader();
+            } else if (source instanceof StAXSource) {
+                reader = ((StAXSource) source).getXMLStreamReader();
+            }
+            if (reader != null) {
+                Map<String, String> nsmap = getNsMap();
+                if (nsmap == null) {
+                    nsmap = Collections.emptyMap();
+                }
+                source = new StAXSource(new DelegatingXMLStreamReader(reader, nsmap));
+                StreamSource streamSource = exchange.getContext().getTypeConverter().convertTo(StreamSource.class, exchange, source);
+                if (streamSource != null) {
+                    try {
+                        li.set(new StreamSourceCache(streamSource, exchange));
+                    } catch (IOException e) {
+                        LOG.error("Cannot Create StreamSourceCache ", e);
+                    }
+                }
+            } else if (!(source instanceof DOMSource)) {
+                Document document = exchange.getContext().getTypeConverter().convertTo(Document.class, exchange, source);
+                if (document != null) {
+                    li.set(new DOMSource(document));
+                }
+            }
+        }
+    }
+
+    private CachedCxfPayload(CachedCxfPayload<T> orig) throws IOException {
+        super(orig.getHeaders(), new ArrayList<Source>(orig.getBodySources()), orig.getNsMap());
+        ListIterator<Source> li = getBodySources().listIterator();
+        this.xml = orig.xml;
+        while (li.hasNext()) {
+            Source source = li.next();
+            if (source instanceof StreamCache) {
+                li.set((Source) (((StreamCache) source)).copy());
+            }
+        }
+    }
+
+    @Override
+    public void reset() {
+        for (Source source : getBodySources()) {
+            if (source instanceof StreamCache) {
+                ((StreamCache) source).reset();
+            }
+        }
+    }
+
+    @Override
+    public void writeTo(OutputStream os) throws IOException {
+        // no body no write
+        if (getBodySources().size() == 0) {
+            return;
+        }
+        Source body = getBodySources().get(0);
+        if (body instanceof StreamCache) {
+            ((StreamCache) body).writeTo(os);
+        } else {
+            StreamResult sr = new StreamResult(os);
+            try {
+                xml.toResult(body, sr);
+            } catch (TransformerException e) {
+                throw new IOException("Transformation failed", e);
+            }
+        }
+    }
+
+    @Override
+    public boolean inMemory() {
+        boolean inMemory = true;
+        for (Source source : getBodySources()) {
+            if (source instanceof StreamCache && !((StreamCache) source).inMemory()) {
+                inMemory = false;
+            }
+        }
+        return inMemory;
+    }
+
+    @Override
+    public long length() {
+        return 0;
+    }
+
+    @Override
+    public StreamCache copy() throws IOException {
+        return new CachedCxfPayload<T>(this);
+    }
+
+    private static class DelegatingXMLStreamReader implements XMLStreamReader {
+        private XMLStreamReader reader;
+        private final Map<String, String> nsmap;
+        private final String[] prefixes;
+
+        public DelegatingXMLStreamReader(XMLStreamReader reader, Map<String, String> nsmap) {
+            this.reader = reader;
+            this.nsmap = nsmap;
+            this.prefixes = nsmap.keySet().toArray(new String[0]);
+        }
+
+        @Override
+        public Object getProperty(String name) throws IllegalArgumentException {
+            return reader.getProperty(name);
+        }
+
+        @Override
+        public int next() throws XMLStreamException {
+            return reader.next();
+        }
+
+        @Override
+        public void require(int type, String namespaceURI, String localName) throws XMLStreamException {
+            reader.require(type, namespaceURI, localName);
+        }
+
+        @Override
+        public String getElementText() throws XMLStreamException {
+            return reader.getElementText();
+        }
+
+        @Override
+        public int nextTag() throws XMLStreamException {
+            return reader.nextTag();
+        }
+
+        @Override
+        public boolean hasNext() throws XMLStreamException {
+            return reader.hasNext();
+        }
+
+        @Override
+        public void close() throws XMLStreamException {
+            reader.close();
+        }
+
+        @Override
+        public String getNamespaceURI(String prefix) {
+            String nsuri = reader.getNamespaceURI();
+            if (nsuri == null) {
+                nsuri = nsmap.get(prefix);
+            }
+            return nsuri;
+        }
+
+        @Override
+        public boolean isStartElement() {
+            return reader.isStartElement();
+        }
+
+        @Override
+        public boolean isEndElement() {
+            return reader.isEndElement();
+        }
+
+        @Override
+        public boolean isCharacters() {
+            return reader.isCharacters();
+        }
+
+        public boolean isWhiteSpace() {
+            return reader.isWhiteSpace();
+        }
+
+        public String getAttributeValue(String namespaceURI, String localName) {
+            return reader.getAttributeValue(namespaceURI, localName);
+        }
+
+        public int getAttributeCount() {
+            return reader.getAttributeCount();
+        }
+
+        public QName getAttributeName(int index) {
+            return reader.getAttributeName(index);
+        }
+
+        public String getAttributeNamespace(int index) {
+            return reader.getAttributeNamespace(index);
+        }
+
+        public String getAttributeLocalName(int index) {
+            return reader.getAttributeLocalName(index);
+        }
+
+        public String getAttributePrefix(int index) {
+            return reader.getAttributePrefix(index);
+        }
+
+        public String getAttributeType(int index) {
+            return reader.getAttributeType(index);
+        }
+
+        public String getAttributeValue(int index) {
+            return reader.getAttributeValue(index);
+        }
+
+        public boolean isAttributeSpecified(int index) {
+            return reader.isAttributeSpecified(index);
+        }
+
+        public int getNamespaceCount() {
+            return prefixes.length + reader.getNamespaceCount();
+        }
+
+        public String getNamespacePrefix(int index) {
+            if (index < prefixes.length) {
+                return prefixes[index];
+            } else {
+                return reader.getNamespacePrefix(index - prefixes.length);
+            }
+        }
+
+        public String getNamespaceURI(int index) {
+            if (index < prefixes.length) {
+                return nsmap.get(prefixes[index]);
+            } else {
+                return reader.getNamespaceURI(index - prefixes.length);
+            }
+        }
+
+        public NamespaceContext getNamespaceContext() {
+            return reader.getNamespaceContext();
+        }
+
+        public int getEventType() {
+            return reader.getEventType();
+        }
+
+        public String getText() {
+            return reader.getText();
+        }
+
+        public char[] getTextCharacters() {
+            return reader.getTextCharacters();
+        }
+
+        public int getTextCharacters(int sourceStart, char[] target, int targetStart, int length) throws XMLStreamException {
+            return reader.getTextCharacters(sourceStart, target, targetStart, length);
+        }
+
+        public int getTextStart() {
+            return reader.getTextStart();
+        }
+
+        public int getTextLength() {
+            return reader.getTextLength();
+        }
+
+        public String getEncoding() {
+            return reader.getEncoding();
+        }
+
+        public boolean hasText() {
+            return reader.hasText();
+        }
+
+        public Location getLocation() {
+            return reader.getLocation();
+        }
+
+        public QName getName() {
+            return reader.getName();
+        }
+
+        public String getLocalName() {
+            return reader.getLocalName();
+        }
+
+        public boolean hasName() {
+            return reader.hasName();
+        }
+
+        public String getNamespaceURI() {
+            return reader.getNamespaceURI();
+        }
+
+        public String getPrefix() {
+            return reader.getPrefix();
+        }
+
+        public String getVersion() {
+            return reader.getVersion();
+        }
+
+        public boolean isStandalone() {
+            return reader.isStandalone();
+        }
+
+        public boolean standaloneSet() {
+            return reader.standaloneSet();
+        }
+
+        public String getCharacterEncodingScheme() {
+            return reader.getCharacterEncodingScheme();
+        }
+
+        public String getPITarget() {
+            return reader.getPITarget();
+        }
+
+        public String getPIData() {
+            return reader.getPIData();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
index ac0713e..7b8da00 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/converter/CxfPayloadConverter.java
@@ -25,24 +25,25 @@ import java.util.List;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.transform.Source;
 import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamSource;
 
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
-
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.FallbackConverter;
+import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
 import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.cxf.staxutils.StaxUtils;
 
 @Converter
 public final class CxfPayloadConverter {
+    private static XmlConverter xml = new XmlConverter();
 
     private CxfPayloadConverter() {
         // Helper class
@@ -108,6 +109,11 @@ public final class CxfPayloadConverter {
         return null;
     }
 
+    @Converter
+    public static <T> StreamCache cxfPayLoadToStreamCache(CxfPayload<T> payload, Exchange exchange) {
+        return new CachedCxfPayload<T>(payload, exchange, xml);
+    }
+
     @SuppressWarnings("unchecked")
     @FallbackConverter
     public static <T> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
@@ -179,7 +185,6 @@ public final class CxfPayloadConverter {
                     } catch (XMLStreamException e) {
                         throw new RuntimeException(e);
                     }
-                    payload.getBodySources().set(0, new DOMSource(d.getDocumentElement()));
                     return type.cast(d);
                 }
                 // CAMEL-8410 Just make sure we get the Source object directly from the payload body source
@@ -189,28 +194,7 @@ public final class CxfPayloadConverter {
                 }
                 TypeConverter tc = registry.lookup(type, Source.class);
                 if (tc != null) {
-                    if ((s instanceof StreamSource
-                        || s instanceof SAXSource) 
-                        && !type.isAssignableFrom(Document.class)
-                        && !type.isAssignableFrom(Source.class)) {
-                        //non-reproducible sources, we need to convert to DOMSource first
-                        //or the payload will get wiped out
-                        Document d;
-                        try {
-                            d = StaxUtils.read(s);
-                        } catch (XMLStreamException e) {
-                            throw new RuntimeException(e);
-                        }
-                        s = new DOMSource(d.getDocumentElement());
-                        payload.getBodySources().set(0, s);
-                    }
-                    
                     T t = tc.convertTo(type, s);
-                    if (t instanceof Document) {
-                        payload.getBodySources().set(0, new DOMSource(((Document)t).getDocumentElement()));
-                    } else if (t instanceof Source) {
-                        payload.getBodySources().set(0, (Source)t);
-                    }
                     return t;
                 }                
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
new file mode 100644
index 0000000..6c7618b
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CachedCxfPayloadTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.sax.SAXSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.cxf.CxfPayload;
+import org.apache.camel.converter.jaxp.XmlConverter;
+import org.apache.camel.test.junit4.ExchangeTestSupport;
+import org.apache.cxf.staxutils.StaxSource;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.junit.Test;
+
+public class CachedCxfPayloadTest extends ExchangeTestSupport {
+    private static final String PAYLOAD = "<foo>bar</foo>";
+
+    @Test
+    public void testCachedCxfPayloadSAXSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+        SAXSource source = context.getTypeConverter().mandatoryConvertTo(SAXSource.class, PAYLOAD);
+        doTest(source);
+    }
+
+    @Test
+    public void testCachedCxfPayloadStAXSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+        StAXSource source = context.getTypeConverter().mandatoryConvertTo(StAXSource.class, PAYLOAD);
+        doTest(source);
+    }
+
+    @Test
+    public void testCachedCxfPayloadStaxSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+        XMLStreamReader streamReader = StaxUtils.createXMLStreamReader(new StreamSource(new StringReader(PAYLOAD)));
+        StaxSource source = new StaxSource(streamReader);
+        doTest(source);
+    }
+
+    @Test
+    public void testCachedCxfPayloadDOMSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+        DOMSource source = context.getTypeConverter().mandatoryConvertTo(DOMSource.class, PAYLOAD);
+        doTest(source);
+    }
+
+    @Test
+    public void testCachedCxfPayloadStreamSource() throws TypeConversionException, NoTypeConversionAvailableException, IOException {
+        StreamSource source = context.getTypeConverter().mandatoryConvertTo(StreamSource.class, PAYLOAD);
+        doTest(source);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void doTest(Object source) throws IOException {
+        CxfPayload<?> originalPayload = context.getTypeConverter().convertTo(CxfPayload.class, source);
+        CachedCxfPayload<?> cache = new CachedCxfPayload(originalPayload, exchange, new XmlConverter());
+
+        assertTrue(cache.inMemory());
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        cache.writeTo(bos);
+
+        String s = context.getTypeConverter().convertTo(String.class, bos);
+        assertEquals(PAYLOAD, s);
+
+        cache.reset();
+
+        CachedCxfPayload clone = (CachedCxfPayload) cache.copy();
+        bos = new ByteArrayOutputStream();
+        clone.writeTo(bos);
+
+        s = context.getTypeConverter().convertTo(String.class, bos);
+        assertEquals(PAYLOAD, s);
+
+        cache.reset();
+        clone.reset();
+
+        s = context.getTypeConverter().convertTo(String.class, cache);
+        assertEquals(PAYLOAD, s);
+
+        s = context.getTypeConverter().convertTo(String.class, clone);
+        assertEquals(PAYLOAD, s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
new file mode 100644
index 0000000..1262ca9
--- /dev/null
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayLoadStreamCacheRouterTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.converter;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.StreamCache;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cxf.CxfSimpleRouterTest;
+
+/**
+ * A unit test for testing reading SOAP body in PAYLOAD mode.
+ * 
+ * @version 
+ */
+public class CxfPayLoadStreamCacheRouterTest extends CxfSimpleRouterTest {
+
+    private String routerEndpointURI = "cxf://" + getRouterAddress() + "?" + SERVICE_CLASS + "&dataFormat=PAYLOAD";
+    private String serviceEndpointURI = "cxf://" + getServiceAddress() + "?" + SERVICE_CLASS + "&dataFormat=PAYLOAD";
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                // START SNIPPET: payload
+                from(routerEndpointURI).streamCaching().process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        Object payload = exchange.getIn().getBody();
+                        assertTrue("payload is not a StreamCache", payload instanceof StreamCache);
+                    }                    
+                })
+                .to(serviceEndpointURI);
+                // END SNIPPET: payload
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
index b969073..17ffc16 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/CxfPayloadConverterTest.java
@@ -31,7 +31,7 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
-
+import org.apache.camel.StreamCache;
 import org.apache.camel.component.cxf.CxfPayload;
 import org.apache.camel.test.junit4.ExchangeTestSupport;
 import org.junit.Before;
@@ -86,7 +86,14 @@ public class CxfPayloadConverterTest extends ExchangeTestSupport {
         assertNotNull(nodeList);
         assertEquals("Get a worng size of nodeList", 1,  nodeList.getLength());
     }
-   
+
+    @Test
+    public void testCxfPayloadToStreamCache() {
+        StreamCache streamCache = CxfPayloadConverter.cxfPayLoadToStreamCache(payload, exchange);
+        assertNotNull(streamCache);
+        assertTrue(streamCache instanceof CxfPayload);
+    }
+
     @Test
     public void testToCxfPayload() {
         // use default type converter

http://git-wip-us.apache.org/repos/asf/camel/blob/5d94b2a8/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
index c62c7d5..d7b1b08 100644
--- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
+++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/converter/PayLoadConvertToPOJOTest.java
@@ -88,7 +88,7 @@ public class PayLoadConvertToPOJOTest extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("cxf:bean:routerEndpoint?dataFormat=PAYLOAD").process(new Processor() {
+                from("cxf:bean:routerEndpoint?dataFormat=PAYLOAD").streamCaching().process(new Processor() {
 
                     @Override
                     public void process(Exchange exchange) throws Exception {