You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/04/01 15:35:01 UTC

svn commit: r760886 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/converter/jaxp/ camel-core/src/main/java/org/apache/camel/converter/stream/ camel-core/src/main/java/org/apache/camel/impl/ came...

Author: davsclaus
Date: Wed Apr  1 13:34:56 2009
New Revision: 760886

URL: http://svn.apache.org/viewvc?rev=760886&view=rev
Log:
CAMEL-1503: StreamCache can now write to an output stream allowing stremas to be serialized when transfered over JMS or Mina etc. This allows using Camel with SMX where Camel gets XML stream from SMX and Camel send it to a JMS queue without having to worry about type convertions beforehand.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/StreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/BytesSource.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/StreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/StreamCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/StreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/StreamCache.java Wed Apr  1 13:34:56 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.camel;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 /**
  * Tagging interface to indicate that a type is capable of caching the underlying data stream.
  * <p/>
@@ -32,4 +35,11 @@
      */
     void reset();
 
+    /**
+     * Writes the stream to the given output
+     *
+     * @param os the destination to write to
+     */
+    void writeTo(OutputStream os) throws IOException;
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/BytesSource.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/BytesSource.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/BytesSource.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/BytesSource.java Wed Apr  1 13:34:56 2009
@@ -20,7 +20,7 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
-
+import java.io.Serializable;
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.util.ObjectHelper;
@@ -31,8 +31,11 @@
  *
  * @version $Revision$
  */
-public class BytesSource extends StreamSource {
-    private final byte[] data;
+public class BytesSource extends StreamSource implements Serializable {
+    private byte[] data;
+
+    public BytesSource() {
+    }
 
     public BytesSource(byte[] data) {
         ObjectHelper.notNull(data, "data");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Wed Apr  1 13:34:56 2009
@@ -21,9 +21,11 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
+import org.apache.camel.util.IOHelper;
 
 public class FileInputStreamCache extends InputStream implements StreamCache {
     private InputStream stream;
@@ -61,7 +63,11 @@
             throw new RuntimeCamelException(e);
         }            
     }
-    
+
+    public void writeTo(OutputStream os) throws IOException {
+        IOHelper.copy(getInputStream(), os);
+    }
+
     @Override
     public int available() throws IOException {
         return getInputStream().available();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java Wed Apr  1 13:34:56 2009
@@ -14,13 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.converter.stream;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.camel.StreamCache;
-
+import org.apache.camel.util.IOHelper;
 
 public class InputStreamCache extends ByteArrayInputStream implements StreamCache {
 
@@ -28,4 +29,8 @@
         super(data);
     }
 
+    public void writeTo(OutputStream os) throws IOException {
+        IOHelper.copy(this, os);
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java Wed Apr  1 13:34:56 2009
@@ -17,6 +17,7 @@
 package org.apache.camel.converter.stream;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.StringReader;
 
 import org.apache.camel.StreamCache;
@@ -50,6 +51,10 @@
         }
     }
 
+    public void writeTo(OutputStream os) throws IOException {
+        os.write(data.getBytes());
+    }
+
     String getData() {
         return data;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java Wed Apr  1 13:34:56 2009
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.converter.stream;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 import org.apache.camel.StreamCache;
 import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.util.IOHelper;
 
 /**
  * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.converter.jaxp.StringSource}s
@@ -37,4 +41,8 @@
         // do nothing here
     }
 
+    public void writeTo(OutputStream os) throws IOException {
+        IOHelper.copy(getInputStream(), os);
+    }
+
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Wed Apr  1 13:34:56 2009
@@ -16,21 +16,20 @@
  */
 package org.apache.camel.converter.stream;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.io.Serializable;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
-import org.apache.camel.FallbackConverter;
 import org.apache.camel.StreamCache;
-import org.apache.camel.TypeConverter;
 import org.apache.camel.converter.jaxp.BytesSource;
 import org.apache.camel.converter.jaxp.StringSource;
-import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.util.IOHelper;
 
 /**
@@ -77,4 +76,18 @@
         return new ReaderCache(data);
     }
 
+    @Converter
+    public Serializable convertToSerializable(StreamCache cache, Exchange exchange) throws IOException {
+        byte[] data = convertToByteArray(cache, exchange);
+        return new BytesSource(data);
+    }
+
+    @Converter
+    public byte[] convertToByteArray(StreamCache cache, Exchange exchange) throws IOException {
+        // lets serialize it as a byte array
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        cache.writeTo(os);
+        return os.toByteArray();
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java Wed Apr  1 13:34:56 2009
@@ -19,8 +19,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.CharBuffer;
+import java.io.OutputStream;
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.Exchange;
@@ -33,12 +32,9 @@
  */
 public class StreamSourceCache extends StreamSource implements StreamCache {
 
-    private transient InputStream stream;
-    private StreamCache streamCache;
-    private ReaderCache readCache;
-
-    public StreamSourceCache() {
-    }
+    private final InputStream stream;
+    private final StreamCache streamCache;
+    private final ReaderCache readCache;
 
     public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
         if (source.getInputStream() != null) {
@@ -46,12 +42,19 @@
             CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
             IOHelper.copyAndCloseInput(source.getInputStream(), cos);
             streamCache = cos.getStreamCache();
+            readCache = null;
             setSystemId(source.getSystemId());
-        }
-        if (source.getReader() != null) {
+            stream = (InputStream) streamCache;
+        } else if (source.getReader() != null) {
             String data = exchange.getContext().getTypeConverter().convertTo(String.class, source.getReader());
             readCache = new ReaderCache(data);
+            streamCache = null;
             setReader(readCache);
+            stream = new ByteArrayInputStream(data.getBytes());
+        } else {
+            streamCache = null;
+            readCache = null;
+            stream = null;
         }
     }
 
@@ -71,16 +74,16 @@
         }
     }
 
+    public void writeTo(OutputStream os) throws IOException {
+        if (streamCache != null) {
+            streamCache.writeTo(os);
+        } else if (readCache != null) {
+            readCache.writeTo(os);
+        }
+    }
+
     @Override
     public InputStream getInputStream() {
-        if (stream == null) {
-            if (streamCache instanceof InputStream) {
-                stream = (InputStream) streamCache;
-            } else if (readCache != null) {
-                String data = readCache.getData();
-                stream = new ByteArrayInputStream(data.getBytes());
-            }
-        }
         return stream;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Wed Apr  1 13:34:56 2009
@@ -68,17 +68,17 @@
     public static DefaultExchangeHolder marshal(Exchange exchange) {
         DefaultExchangeHolder payload = new DefaultExchangeHolder();
 
-        payload.inBody = checkSerializableObject("in body", exchange.getIn().getBody());
-        payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange.getIn().getHeaders()));
+        payload.inBody = checkSerializableObject("in body", exchange, exchange.getIn().getBody());
+        payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange, exchange.getIn().getHeaders()));
         if (exchange.getOut(false) != null) {
-            payload.outBody = checkSerializableObject("out body", exchange.getOut().getBody());
-            payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange.getOut().getHeaders()));
+            payload.outBody = checkSerializableObject("out body", exchange, exchange.getOut().getBody());
+            payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange, exchange.getOut().getHeaders()));
         }
         if (exchange.getFault(false) != null) {
-            payload.faultBody = checkSerializableObject("fault body", exchange.getFault().getBody());
-            payload.faultHeaders.putAll(checkMapSerializableObjects("fault headers", exchange.getFault().getHeaders()));
+            payload.faultBody = checkSerializableObject("fault body", exchange, exchange.getFault().getBody());
+            payload.faultHeaders.putAll(checkMapSerializableObjects("fault headers", exchange, exchange.getFault().getHeaders()));
         }
-        payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange.getProperties()));
+        payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange, exchange.getProperties()));
         payload.exception = exchange.getException();
 
         return payload;
@@ -116,26 +116,32 @@
         return sb.append(']').toString();
     }
 
-    private static Object checkSerializableObject(String type, Object object) {
-        if (object instanceof Serializable) {
-            return object;
+    private static Object checkSerializableObject(String type, Exchange exchange, Object object) {
+        if (object == null) {
+            return null;
+        }
+
+        Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object);
+        if (converted != null) {
+            return converted;
         } else {
-            LOG.warn(type + " containig object " + object + " cannot be serialized, it will be excluded by the holder");
+            LOG.warn(type + " containig object: " + object + " of type: " + object.getClass().getCanonicalName() + " cannot be serialized, it will be excluded by the holder");
             return null;
         }
     }
 
-    private static Map<String, Object> checkMapSerializableObjects(String type, Map<String, Object> map) {
+    private static Map<String, Object> checkMapSerializableObjects(String type, Exchange exchange, Map<String, Object> map) {
         if (map == null) {
             return null;
         }
 
         Map<String, Object> result = new LinkedHashMap<String, Object>();
         for (Map.Entry<String, Object> entry : map.entrySet()) {
-            if (entry.getValue() instanceof Serializable) {
-                result.put(entry.getKey(), entry.getValue());
+            Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, entry.getValue());
+            if (converted != null) {
+                result.put(entry.getKey(), converted);
             } else {
-                LOG.warn(type + " containing object " + entry.getValue() + " of key " + entry.getKey()
+                LOG.warn(type + " containing object: " + entry.getValue() + " with key: " + entry.getKey()
                         + " cannot be serialized, it will be excluded by the holder");
             }
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java Wed Apr  1 13:34:56 2009
@@ -16,8 +16,10 @@
  */
 package org.apache.camel.util;
 
-import junit.framework.TestCase;
+import java.io.IOException;
+import java.io.OutputStream;
 
+import junit.framework.TestCase;
 import org.apache.camel.Message;
 import org.apache.camel.StreamCache;
 import org.apache.camel.impl.DefaultMessage;
@@ -39,7 +41,7 @@
      */
     public void testResetStreamCache() throws Exception {
         // should not throw exceptions when Message or message body is null
-        MessageHelper.resetStreamCache((Message) null);
+        MessageHelper.resetStreamCache(null);
         MessageHelper.resetStreamCache(message);
         
         // handle StreamCache
@@ -48,6 +50,10 @@
             public void reset() {
                 reset.set(Boolean.TRUE);
             }
+
+            public void writeTo(OutputStream os) throws IOException {
+                // noop
+            }
         });
         MessageHelper.resetStreamCache(message);
         assertTrue("Should have reset the stream cache", reset.get());

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Wed Apr  1 13:34:56 2009
@@ -42,6 +42,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -353,7 +354,7 @@
             if (body instanceof Node || body instanceof String) {
                 type = Text;
             } else if (body instanceof byte[] || body instanceof GenericFile || body instanceof File || body instanceof Reader
-                    || body instanceof InputStream || body instanceof ByteBuffer) {
+                    || body instanceof InputStream || body instanceof ByteBuffer || body instanceof StreamCache) {
                 type = Bytes;
             } else if (body instanceof Map) {
                 type = Map;

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java?rev=760886&r1=760885&r2=760886&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java Wed Apr  1 13:34:56 2009
@@ -41,7 +41,7 @@
     private static final String TEST_LONDON = "src/test/data/message1.xml";
     private static final String TEST_TAMPA = "src/test/data/message2.xml";
 
-    public void testLondonWithFileStream() throws Exception {
+    public void testLondonWithFileStreamAsObject() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:london");
         mock.expectedMessageCount(1);
         mock.message(0).bodyAs(String.class).contains("James");
@@ -49,12 +49,38 @@
         Source source = new StreamSource(new FileInputStream(TEST_LONDON));
         assertNotNull(source);
 
-        template.sendBody("direct:start", source);
+        template.sendBody("direct:object", source);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testTampaWithFileStream() throws Exception {
+    public void testLondonWithFileStreamAsBytes() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StreamSource(new FileInputStream(TEST_LONDON));
+        assertNotNull(source);
+
+        template.sendBody("direct:bytes", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLondonWithFileStreamAsDefault() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StreamSource(new FileInputStream(TEST_LONDON));
+        assertNotNull(source);
+
+        template.sendBody("direct:default", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testTampaWithFileStreamAsObject() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:tampa");
         mock.expectedMessageCount(1);
         mock.message(0).bodyAs(String.class).contains("Hiram");
@@ -62,12 +88,38 @@
         Source source = new StreamSource(new FileInputStream(TEST_TAMPA));
         assertNotNull(source);
 
-        template.sendBody("direct:start", source);
+        template.sendBody("direct:object", source);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testLondonWithStringSource() throws Exception {
+    public void testTampaWithFileStreamAsBytes() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:tampa");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("Hiram");
+
+        Source source = new StreamSource(new FileInputStream(TEST_TAMPA));
+        assertNotNull(source);
+
+        template.sendBody("direct:bytes", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testTampaWithFileStreamAsDefault() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:tampa");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("Hiram");
+
+        Source source = new StreamSource(new FileInputStream(TEST_TAMPA));
+        assertNotNull(source);
+
+        template.sendBody("direct:default", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLondonWithStringSourceAsObject() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:london");
         mock.expectedMessageCount(1);
         mock.message(0).bodyAs(String.class).contains("James");
@@ -79,11 +131,44 @@
                 + "</person>");
         assertNotNull(source);
 
-        template.sendBody("direct:start", source);
+        template.sendBody("direct:object", source);
 
         assertMockEndpointsSatisfied();
     }
 
+    public void testLondonWithStringSourceAsBytes() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StringSource("<person user=\"james\">\n"
+                + "  <firstName>James</firstName>\n"
+                + "  <lastName>Strachan</lastName>\n"
+                + "  <city>London</city>\n"
+                + "</person>");
+        assertNotNull(source);
+
+        template.sendBody("direct:bytes", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLondonWithStringSourceAsDefault() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StringSource("<person user=\"james\">\n"
+                + "  <firstName>James</firstName>\n"
+                + "  <lastName>Strachan</lastName>\n"
+                + "  <city>London</city>\n"
+                + "</person>");
+        assertNotNull(source);
+
+        template.sendBody("direct:default", source);
+
+        assertMockEndpointsSatisfied();
+    }
 
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
@@ -99,16 +184,37 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("activemq:queue:foo");
+                // no need to convert to String as JMS producer can handle XML streams now
+                from("direct:object").to("activemq:queue:object?jmsMessageType=Object");
 
-                from("activemq:queue:foo")
+                // no need to convert to String as JMS producer can handle XML streams now
+                from("direct:bytes").to("activemq:queue:bytes?jmsMessageType=Bytes");
+
+                // no need to convert to String as JMS producer can handle XML streams now
+                from("direct:default").to("activemq:queue:default");
+
+                from("activemq:queue:object")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             Object body = exchange.getIn().getBody();
                             // should preserve the object as Source
                             assertIsInstanceOf(Source.class, body);
                         }
-                    })
+                    }).to("seda:choice");
+
+                from("activemq:queue:bytes")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Object body = exchange.getIn().getBody();
+                            // should be a byte array by default
+                            assertIsInstanceOf(byte[].class, body);
+                        }
+                    }).to("seda:choice");
+
+                from("activemq:queue:default")
+                    .to("seda:choice");
+
+                from("seda:choice")
                     .choice()
                         .when().xpath("/person/city = 'London'").to("mock:london")
                         .when().xpath("/person/city = 'Tampa'").to("mock:tampa")
@@ -118,5 +224,4 @@
         };
     }
 
-
 }