You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/04/01 11:55:00 UTC

svn commit: r760833 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/converter/ camel-core/src/main/java/org/apache/camel/converter/jaxp/ camel-core/src/main/java/org/apache/camel/converter/stream/ camel-core/src/test/java/org/apache/camel/...

Author: davsclaus
Date: Wed Apr  1 09:54:55 2009
New Revision: 760833

URL: http://svn.apache.org/viewvc?rev=760833&view=rev
Log:
CAMEL-1503: First part of doing SMX-Camel with XML streams easier.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java   (with props)
    camel/trunk/components/camel-jms/src/test/data/
    camel/trunk/components/camel-jms/src/test/data/message1.xml   (with props)
    camel/trunk/components/camel-jms/src/test/data/message2.xml   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java Wed Apr  1 09:54:55 2009
@@ -42,6 +42,7 @@
 import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.net.URL;
+import java.nio.CharBuffer;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.dom.DOMSource;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/ObjectConverter.java Wed Apr  1 09:54:55 2009
@@ -16,11 +16,15 @@
  */
 package org.apache.camel.converter;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Some core java.lang based <a
@@ -31,6 +35,8 @@
 @Converter
 public final class ObjectConverter {
 
+    private static final transient Log LOG = LogFactory.getLog(ObjectConverter.class);
+
     /**
      * Utility classes should not have a public constructor.
      */
@@ -102,8 +108,23 @@
     }
 
     @Converter
-    public static byte[] toByteArray(String value) {
-        return value.getBytes();
+    public static byte[] toByteArray(String value, Exchange exchange) {
+        byte[] bytes = null;
+        if (exchange != null) {
+            String charsetName = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
+            if (charsetName != null) {
+                try {
+                    bytes = value.getBytes(charsetName);
+                } catch (UnsupportedEncodingException e) {
+                    LOG.warn("Cannot convert the byte to String with the charset " + charsetName, e);
+                }
+            }
+        }
+        if (bytes == null) {
+            bytes = value.getBytes();
+        }
+
+        return bytes;
     }
 
     @Converter

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/jaxp/XmlConverter.java Wed Apr  1 09:54:55 2009
@@ -180,6 +180,19 @@
     }
 
     /**
+     * Converts the given input Source into bytes
+     */
+    @Converter
+    public byte[] toByteArray(Source source, Exchange exchange) throws TransformerException {
+        String answer = toString(source);
+        if (exchange != null) {
+            return exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, answer);
+        } else {
+            return answer.getBytes();
+        }
+    }
+
+    /**
      * Converts the given input Node into text
      */
     @Converter

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Apr  1 09:54:55 2009
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -50,22 +51,16 @@
     protected boolean outputLocked;
     protected OutputStream currentStream;
 
+    private final List<Object> streamList = new ArrayList<Object>();
     private long threshold = 64 * 1024;
-
     private int totalLength;
-
-    private boolean inmem;
-
+    private boolean inMemory;
     private File tempFile;
+    private File outputDir;
 
-    private File outputDir;   
-    
-    private List<Object> streamList = new ArrayList<Object>();
-
-    
     public CachedOutputStream() {
         currentStream = new ByteArrayOutputStream(2048);
-        inmem = true;
+        inMemory = true;
     }
 
     public CachedOutputStream(long threshold) {
@@ -100,7 +95,6 @@
      * output stream ... etc.)
      */
     protected void doFlush() throws IOException {
-        
     }
 
     public void flush() throws IOException {
@@ -112,14 +106,12 @@
      * Perform any actions required on stream closure (handle response etc.)
      */
     protected void doClose() throws IOException {
-        
     }
     
     /**
      * Perform any actions required after stream closure (close the other related stream etc.)
      */
     protected void postClose() throws IOException {
-        
     }
 
     /**
@@ -166,14 +158,14 @@
             InputStream in = ac.getInputStream();
             IOHelper.copyAndCloseInput(in, out);
         } else {
-            if (inmem) {
+            if (inMemory) {
                 if (currentStream instanceof ByteArrayOutputStream) {
                     ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
                     if (copyOldContent && byteOut.size() > 0) {
                         byteOut.writeTo(out);
                     }
                 } else {
-                    throw new IOException("Unknown format of currentStream");
+                    throw new IOException("Unknown format of currentStream: " + currentStream);
                 }
             } else {
                 // read the file
@@ -185,7 +177,7 @@
                 streamList.remove(currentStream);
                 tempFile.delete();
                 tempFile = null;
-                inmem = true;
+                inMemory = true;
             }
         }
         currentStream = out;
@@ -202,7 +194,7 @@
     
     public byte[] getBytes() throws IOException {
         flush();
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 return ((ByteArrayOutputStream)currentStream).toByteArray();
             } else {
@@ -217,7 +209,7 @@
     
     public void writeCacheTo(OutputStream out) throws IOException {
         flush();
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 ((ByteArrayOutputStream)currentStream).writeTo(out);
             } else {
@@ -240,12 +232,12 @@
         }
         
         int count = 0;
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
                 out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
             } else {
-                throw new IOException("Unknown format of currentStream");
+                throw new IOException("Unknown format of currentStream: " + currentStream);
             }
         } else {
             // read the file
@@ -270,12 +262,12 @@
     }
     public void writeCacheTo(StringBuilder out) throws IOException {
         flush();
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
                 out.append(IOHelper.newStringFromBytes(bytes));
             } else {
-                throw new IOException("Unknown format of currentStream");
+                throw new IOException("Unknown format of currentStream: " + currentStream);
             }
         } else {
             // read the file
@@ -315,14 +307,13 @@
     }
 
     protected void onWrite() throws IOException {
-        
     }
 
     public void write(byte[] b, int off, int len) throws IOException {
         if (!outputLocked) {
             onWrite();
             this.totalLength += len;
-            if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
                 createFileOutputStream();
             }
             currentStream.write(b, off, len);
@@ -333,7 +324,7 @@
         if (!outputLocked) {
             onWrite();
             this.totalLength += b.length;
-            if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
                 createFileOutputStream();
             }
             currentStream.write(b);
@@ -344,7 +335,7 @@
         if (!outputLocked) {
             onWrite();
             this.totalLength++;
-            if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
                 createFileOutputStream();
             }
             currentStream.write(b);
@@ -361,7 +352,7 @@
         
         currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
         bout.writeTo(currentStream);
-        inmem = false;
+        inMemory = false;
         streamList.add(currentStream);
     }
 
@@ -371,7 +362,7 @@
 
     public InputStream getInputStream() throws IOException {
         flush();
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
             } else {
@@ -388,14 +379,14 @@
                 streamList.add(fileInputStream);
                 return fileInputStream;
             } catch (FileNotFoundException e) {
-                throw new IOException("Cached file was deleted, " + e.toString());
+                throw IOHelper.createIOException("Cached file was already deleted", e);
             }
         }
     }
     
     public StreamCache getStreamCache() throws IOException {
         flush();
-        if (inmem) {
+        if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
             } else {
@@ -403,27 +394,27 @@
             }
         } else {
             try {
-                FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this);
-                return fileInputStream;
+                return new FileInputStreamCache(tempFile, this);
             } catch (FileNotFoundException e) {
-                throw new IOException("Cached file was deleted, " + e.toString());
+                throw IOHelper.createIOException("Cached file was already deleted", e);
             }
         }
     }
     
     private void maybeDeleteTempFile(Object stream) {        
         streamList.remove(stream);        
-        if (!inmem && tempFile != null && streamList.isEmpty()) {            
+        if (!inMemory && tempFile != null && streamList.isEmpty()) {
             tempFile.delete();
             tempFile = null;
             currentStream = new ByteArrayOutputStream(1024);
-            inmem = true;
+            inMemory = true;
         }
     }
 
     public void setOutputDir(File outputDir) throws IOException {
         this.outputDir = outputDir;
     }
+
     public void setThreshold(long threshold) {
         this.threshold = threshold;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Wed Apr  1 09:54:55 2009
@@ -26,41 +26,57 @@
 import org.apache.camel.StreamCache;
 
 public class FileInputStreamCache extends InputStream implements StreamCache {
-    private FileInputStream inputStream;
+    private InputStream stream;
     private CachedOutputStream cachedOutputStream;
     private File file;
 
+    public FileInputStreamCache() {
+    }
+
     public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
         this.file = file;
-        cachedOutputStream = cos;
-        inputStream = new FileInputStream(file);       
+        this.cachedOutputStream = cos;
+        this.stream = new FileInputStream(file);
     }
     
+    @Override
     public void close() {
         try {
-            inputStream.close();
-            cachedOutputStream.close();
-        } catch (Exception exception) {
-            throw new RuntimeCamelException(exception);
+            getInputStream().close();
+            if (cachedOutputStream != null) {
+                cachedOutputStream.close();
+            }
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
         } 
     }
 
-    public void reset() {            
+    @Override
+    public void reset() {
         try {
-            inputStream.close();            
-            inputStream = new FileInputStream(file);
-        } catch (Exception exception) {
-            throw new RuntimeCamelException(exception);
+            getInputStream().close();
+            // reset by creating a new stream based on the file
+            stream = new FileInputStream(file);
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
         }            
     }
     
+    @Override
     public int available() throws IOException {
-        return inputStream.available();
+        return getInputStream().available();
     }
 
     @Override
     public int read() throws IOException {
-        return inputStream.read();
-    }   
-    
-}   
+        return getInputStream().read();
+    }
+
+    protected InputStream getInputStream() throws FileNotFoundException {
+        if (file != null && stream == null) {
+            stream = new FileInputStream(file);
+        }
+        return stream;
+    }
+
+}

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java Wed Apr  1 09:54:55 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.camel.StreamCache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for Cache the Reader {@link java.io.Reader}s
+ */
+public class ReaderCache extends StringReader implements StreamCache {
+
+    private static final transient Log LOG = LogFactory.getLog(ReaderCache.class);
+
+    private String data;
+
+    public ReaderCache(String data) {
+        super(data);
+        this.data = data;
+    }
+
+    public void close() {
+        // Do not release the string for caching
+    }
+
+    @Override
+    public void reset() {
+        try {
+            super.reset();
+        } catch (IOException e) {
+            LOG.warn("Cannot reset cache", e);
+        }
+    }
+
+    String getData() {
+        return data;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/ReaderCache.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java Wed Apr  1 09:54:55 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.converter.jaxp.StringSource;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for {@link org.apache.camel.converter.jaxp.StringSource}s
+ */
+public class SourceCache extends StringSource implements StreamCache {
+
+    private static final long serialVersionUID = 1L;
+
+    public SourceCache() {
+    }
+
+    public SourceCache(String data) {
+        new StringSource(data);
+    }
+
+    public void reset() {
+        // do nothing here
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/SourceCache.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Wed Apr  1 09:54:55 2009
@@ -19,20 +19,19 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
-import java.io.StringReader;
-
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
 import org.apache.camel.StreamCache;
+import org.apache.camel.TypeConverter;
 import org.apache.camel.converter.jaxp.BytesSource;
 import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.spi.TypeConverterRegistry;
 import org.apache.camel.util.IOHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A set of {@link Converter} methods for wrapping stream-based messages in a {@link StreamCache}
@@ -40,25 +39,24 @@
  */
 @Converter
 public class StreamCacheConverter {
-    private static final transient Log LOG = LogFactory.getLog(StreamCacheConverter.class);
 
     @Converter
     public StreamCache convertToStreamCache(StreamSource source, Exchange exchange) throws IOException {
         return new StreamSourceCache(source, exchange);
     }
-    
+
     @Converter
     public StreamCache convertToStreamCache(StringSource source) {
         //no need to do stream caching for a StringSource
         return null;
     }
-    
+
     @Converter
     public StreamCache convertToStreamCache(BytesSource source) {
         //no need to do stream caching for a BytesSource
         return null;
     }
-    
+
     @Converter
     public StreamCache convertToStreamCache(SAXSource source, Exchange exchange) throws TransformerException {
         String data = exchange.getContext().getTypeConverter().convertTo(String.class, source);
@@ -69,7 +67,7 @@
     public StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
         // set up CachedOutputStream with the properties
         CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
-        IOHelper.copyAndCloseInput(stream, cos);       
+        IOHelper.copyAndCloseInput(stream, cos);
         return cos.getStreamCache();
     }
 
@@ -79,81 +77,4 @@
         return new ReaderCache(data);
     }
 
-    /*
-     * {@link StreamCache} implementation for {@link Source}s
-     */
-    private class SourceCache extends StringSource implements StreamCache {
-
-        private static final long serialVersionUID = 4147248494104812945L;
-
-        public SourceCache() {
-        }
-
-        public SourceCache(String text) {
-            super(text);
-        }
-
-        public void reset() {
-            // do nothing here
-        }
-
-    }
-    
-    /*
-     * {@link StreamCache} implementation for Cache the StreamSource {@link StreamSource}s
-     */
-    private class StreamSourceCache extends StreamSource implements StreamCache {
-        StreamCache streamCache;
-        ReaderCache readCache;
-        
-        public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
-            if (source.getInputStream() != null) {
-                // set up CachedOutputStream with the properties
-                CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
-                IOHelper.copyAndCloseInput(source.getInputStream(), cos);
-                streamCache = cos.getStreamCache();
-                setInputStream((InputStream)streamCache);
-                setSystemId(source.getSystemId());
-            }
-            if (source.getReader() != null) {
-                String data = exchange.getContext().getTypeConverter().convertTo(String.class, source.getReader());
-                readCache = new ReaderCache(data);
-                setReader(readCache);
-            }
-        }
-
-        public void reset() {
-            if (streamCache != null) {
-                streamCache.reset();
-            }
-            if (readCache != null) {
-                readCache.reset();
-            }            
-        }
-        
-    }
-      
-    private class ReaderCache extends StringReader implements StreamCache {
-
-        public ReaderCache(String s) {
-            super(s);
-        }
-
-        public void reset() {
-            try {
-                super.reset();
-            } catch (IOException e) {
-                LOG.warn("Cannot reset cache", e);
-            }
-        }
-
-        public void close() {
-            // Do not release the string for caching
-        }
-
-    }
-    
-    
-
-
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java Wed Apr  1 09:54:55 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.CharBuffer;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.util.IOHelper;
+
+/**
+ * {@link org.apache.camel.StreamCache} implementation for Cache the StreamSource {@link javax.xml.transform.stream.StreamSource}s
+ */
+public class StreamSourceCache extends StreamSource implements StreamCache {
+
+    private transient InputStream stream;
+    private StreamCache streamCache;
+    private ReaderCache readCache;
+
+    public StreamSourceCache() {
+    }
+
+    public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
+        if (source.getInputStream() != null) {
+            // set up CachedOutputStream with the properties
+            CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+            IOHelper.copyAndCloseInput(source.getInputStream(), cos);
+            streamCache = cos.getStreamCache();
+            setSystemId(source.getSystemId());
+        }
+        if (source.getReader() != null) {
+            String data = exchange.getContext().getTypeConverter().convertTo(String.class, source.getReader());
+            readCache = new ReaderCache(data);
+            setReader(readCache);
+        }
+    }
+
+    public void reset() {
+        if (streamCache != null) {
+            streamCache.reset();
+        }
+        if (readCache != null) {
+            readCache.reset();
+        }
+        if (stream != null) {
+            try {
+                stream.reset();
+            } catch (IOException e) {
+                throw new RuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
+    public InputStream getInputStream() {
+        if (stream == null) {
+            if (streamCache instanceof InputStream) {
+                stream = (InputStream) streamCache;
+            } else if (readCache != null) {
+                String data = readCache.getData();
+                stream = new ByteArrayInputStream(data.getBytes());
+            }
+        }
+        return stream;
+    }
+
+    @Override
+    public void setInputStream(InputStream inputStream) {
+        // noop as the input stream is from stream or reader cache
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java Wed Apr  1 09:54:55 2009
@@ -63,7 +63,6 @@
     }
        
     public void testCacheStreamToFileAndCloseStream() throws IOException {       
-        
         CachedOutputStream cos = new CachedOutputStream(16);
         cos.setOutputDir(file);
         cos.write(TEST_STRING.getBytes("UTF-8"));        
@@ -85,11 +84,9 @@
         }
         files = file.list();
         assertEquals("we should have no temp file", files.length, 0);
-       
     }
     
     public void testCacheStreamToFileAndNotCloseStream() throws IOException {       
-        
         CachedOutputStream cos = new CachedOutputStream(16);
         cos.setOutputDir(file);
         cos.write(TEST_STRING.getBytes("UTF-8"));        

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Wed Apr  1 09:54:55 2009
@@ -19,7 +19,6 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.io.Reader;
 import java.io.StringReader;
 
 import javax.xml.transform.stream.StreamSource;
@@ -69,7 +68,8 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3));
+                // 0 delay for faster unit test
+                errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3).delay(0));
                 from("direct:start").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         count++;

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Wed Apr  1 09:54:55 2009
@@ -371,27 +371,25 @@
             switch (type) {
             case Text: {
                 TextMessage message = session.createTextMessage();
-                String payload = context.getTypeConverter().convertTo(String.class, body);
+                String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
                 message.setText(payload);
                 return message;
             }
             case Bytes: {
                 BytesMessage message = session.createBytesMessage();
-                byte[] payload = context.getTypeConverter().convertTo(byte[].class, body);
+                byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
                 message.writeBytes(payload);
                 return message;
             }
             case Map: {
                 MapMessage message = session.createMapMessage();
-                Map payload = context.getTypeConverter().convertTo(Map.class, body);
+                Map payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
                 populateMapMessage(message, payload, context);
                 return message;
             }
             case Object:
-                return session.createObjectMessage((Serializable)body);
-            case Strem:
-                // TODO: Stream is not supported
-                break;
+                Serializable payload = context.getTypeConverter().convertTo(Serializable.class, exchange, body);
+                return session.createObjectMessage(payload);
             default:
                 break;
             }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java?rev=760833&r1=760832&r2=760833&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageType.java Wed Apr  1 09:54:55 2009
@@ -23,6 +23,6 @@
  */
 public enum JmsMessageType {
 
-    Bytes, Map, Object, Strem, Text
+    Bytes, Map, Object, Stream, Text
 
 }

Added: camel/trunk/components/camel-jms/src/test/data/message1.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/data/message1.xml?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/data/message1.xml (added)
+++ camel/trunk/components/camel-jms/src/test/data/message1.xml Wed Apr  1 09:54:55 2009
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<person user="james">
+  <firstName>James</firstName>
+  <lastName>Strachan</lastName>
+  <city>London</city>
+</person>
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-jms/src/test/data/message1.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-jms/src/test/data/message2.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/data/message2.xml?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/data/message2.xml (added)
+++ camel/trunk/components/camel-jms/src/test/data/message2.xml Wed Apr  1 09:54:55 2009
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<person user="hiram">
+  <firstName>Hiram</firstName>
+  <lastName>Chirino</lastName>
+  <city>Tampa</city>
+</person>
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-jms/src/test/data/message2.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java?rev=760833&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java Wed Apr  1 09:54:55 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.FileInputStream;
+import javax.jms.ConnectionFactory;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.jaxp.StringSource;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * For unit testing with XML streams that can be troublesome with the StreamCache
+ *
+ * @version $Revision$
+ */
+public class JmsXMLRouteTest extends ContextTestSupport {
+
+    private static final String TEST_LONDON = "src/test/data/message1.xml";
+    private static final String TEST_TAMPA = "src/test/data/message2.xml";
+
+    public void testLondonWithFileStream() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StreamSource(new FileInputStream(TEST_LONDON));
+        assertNotNull(source);
+
+        template.sendBody("direct:start", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testTampaWithFileStream() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:tampa");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("Hiram");
+
+        Source source = new StreamSource(new FileInputStream(TEST_TAMPA));
+        assertNotNull(source);
+
+        template.sendBody("direct:start", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLondonWithStringSource() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:london");
+        mock.expectedMessageCount(1);
+        mock.message(0).bodyAs(String.class).contains("James");
+
+        Source source = new StringSource("<person user=\"james\">\n"
+                + "  <firstName>James</firstName>\n"
+                + "  <lastName>Strachan</lastName>\n"
+                + "  <city>London</city>\n"
+                + "</person>");
+        assertNotNull(source);
+
+        template.sendBody("direct:start", source);
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("activemq:queue:foo");
+
+                from("activemq:queue:foo")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Object body = exchange.getIn().getBody();
+                            // should preserve the object as Source
+                            assertIsInstanceOf(Source.class, body);
+                        }
+                    })
+                    .choice()
+                        .when().xpath("/person/city = 'London'").to("mock:london")
+                        .when().xpath("/person/city = 'Tampa'").to("mock:tampa")
+                        .otherwise().to("mock:unknown")
+                    .end();
+            }
+        };
+    }
+
+
+}

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsXMLRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date