You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/07/26 13:29:54 UTC

svn commit: r797907 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/converter/stream/ camel-core/src/main/java/org/apache/camel/util/ camel-core/src/test/java/org/apache/camel/converter/stream/ camel-core/src/test/java/org/apache/camel/iss...

Author: davsclaus
Date: Sun Jul 26 11:29:54 2009
New Revision: 797907

URL: http://svn.apache.org/viewvc?rev=797907&view=rev
Log:
CAMEL-1849: Cleanup in CachedOutputStream. Http component uses a defensive copy of response body stream to avoid returning a live stream that has been closed already. Also ensured that file based stream cache always deletes temp files as its done by on completion strategy now.

Added:
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
    camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Sun Jul 26 11:29:54 2009
@@ -20,20 +20,19 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
-import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.SynchronizationAdapter;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * This output stream will store the content into a File if the stream context size is exceed the
@@ -44,378 +43,155 @@
  * output stream will be deleted when you close this output stream or the cached inputStream.
  */
 public class CachedOutputStream extends OutputStream {
+    private static final transient Log LOG = LogFactory.getLog(CachedOutputStream.class);
+
     public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
     public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
    
-    protected boolean outputLocked;
-    protected OutputStream currentStream;
-
-    private final List<Object> streamList = new ArrayList<Object>();
-    private long threshold = 64 * 1024;
+    private OutputStream currentStream = new ByteArrayOutputStream(2048);
+    private boolean inMemory = true;
     private int totalLength;
-    private boolean inMemory;
     private File tempFile;
+
+    private long threshold = 64 * 1024;
     private File outputDir;
 
-    public CachedOutputStream() {
-        currentStream = new ByteArrayOutputStream(2048);
-        inMemory = true;
-    }
-
-    public CachedOutputStream(long threshold) {
-        this();
-        this.threshold = threshold;        
-    }
-    
-    public CachedOutputStream(Map<String, String> properties) {
-        this();
-        String value = properties.get(THRESHOLD);
-        if (value != null) {
-            int i = Integer.parseInt(value);
-            if (i > 0) {
-                threshold = i;
-            }
-        }
-        value = properties.get(TEMP_DIR);
-        if (value != null) {
-            File f = new File(value);
-            if (f.exists() && f.isDirectory()) {
-                outputDir = f;
-            } else {
-                outputDir = null;
+    public CachedOutputStream(Exchange exchange) {
+        String hold = exchange.getContext().getProperties().get(THRESHOLD);
+        String dir = exchange.getContext().getProperties().get(TEMP_DIR);
+        if (hold != null) {
+            this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
+        }
+        if (dir != null) {
+            this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
+        }
+
+        // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+        exchange.addOnCompletion(new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                try {
+                    // cleanup temporary file
+                    if (tempFile != null) {
+                        boolean deleted = tempFile.delete();
+                        if (!deleted) {
+                            LOG.warn("Cannot delete temporary cache file: " + tempFile);
+                        } else if (LOG.isTraceEnabled()) {
+                            LOG.trace("Deleted temporary cache file: " + tempFile);
+                        }
+                        tempFile = null;
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error deleting temporary cache file: " + tempFile, e);
+                }
             }
-        } else {
-            outputDir = null;
-        }        
-    }
 
-    /**
-     * Perform any actions required on stream flush (freeze headers, reset
-     * output stream ... etc.)
-     */
-    protected void doFlush() throws IOException {
+            @Override
+            public String toString() {
+                return "OnCompletion[CachedOutputStream]";
+            }
+        });
     }
 
     public void flush() throws IOException {
         currentStream.flush();       
-        doFlush();
     }
 
-    /**
-     * Perform any actions required on stream closure (handle response etc.)
-     */
-    protected void doClose() throws IOException {
-    }
-    
-    /**
-     * Perform any actions required after stream closure (close the other related stream etc.)
-     */
-    protected void postClose() throws IOException {
-    }
-
-    /**
-     * Locks the output stream to prevent additional writes, but maintains
-     * a pointer to it so an InputStream can be obtained
-     * @throws IOException
-     */
-    public void lockOutputStream() throws IOException {
-        currentStream.flush();
-        outputLocked = true;
-        streamList.remove(currentStream);
-    }
-    
     public void close() throws IOException {
-        currentStream.flush();        
-        doClose();
         currentStream.close();
-        maybeDeleteTempFile(currentStream);
-        postClose();
     }
 
     public boolean equals(Object obj) {
         return currentStream.equals(obj);
     }
 
-    /**
-     * Replace the original stream with the new one, optionally copying the content of the old one
-     * into the new one.
-     * When with Attachment, needs to replace the xml writer stream with the stream used by
-     * AttachmentSerializer or copy the cached output stream to the "real"
-     * output stream, i.e. onto the wire.
-     * 
-     * @param out the new output stream
-     * @param copyOldContent flag indicating if the old content should be copied
-     * @throws IOException
-     */
-    public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
-        if (out == null) {
-            out = new ByteArrayOutputStream();
-        }
-
-        if (currentStream instanceof CachedOutputStream) {
-            CachedOutputStream ac = (CachedOutputStream) currentStream;
-            InputStream in = ac.getInputStream();
-            IOHelper.copyAndCloseInput(in, out);
-        } else {
-            if (inMemory) {
-                if (currentStream instanceof ByteArrayOutputStream) {
-                    ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
-                    if (copyOldContent && byteOut.size() > 0) {
-                        byteOut.writeTo(out);
-                    }
-                } else {
-                    throw new IOException("Unknown format of currentStream: " + currentStream);
-                }
-            } else {
-                // read the file
-                currentStream.close();
-                FileInputStream fin = new FileInputStream(tempFile);
-                if (copyOldContent) {
-                    IOHelper.copyAndCloseInput(fin, out);
-                }
-                streamList.remove(currentStream);
-                tempFile.delete();
-                tempFile = null;
-                inMemory = true;
-            }
-        }
-        currentStream = out;
-        outputLocked = false;
-    }
-
-    public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
-        IOHelper.copyAndCloseInput(in, out, bufferSize);
-    }
-
-    public int size() {
-        return totalLength;
-    }
-    
-    public byte[] getBytes() throws IOException {
-        flush();
-        if (inMemory) {
-            if (currentStream instanceof ByteArrayOutputStream) {
-                return ((ByteArrayOutputStream)currentStream).toByteArray();
-            } else {
-                throw new IOException("Unknown format of currentStream");
-            }
-        } else {
-            // read the file
-            FileInputStream fin = new FileInputStream(tempFile);
-            return IOConverter.toBytes(fin);
-        }
-    }
-    
-    public void writeCacheTo(OutputStream out) throws IOException {
-        flush();
-        if (inMemory) {
-            if (currentStream instanceof ByteArrayOutputStream) {
-                ((ByteArrayOutputStream)currentStream).writeTo(out);
-            } else {
-                throw new IOException("Unknown format of currentStream");
-            }
-        } else {
-            // read the file
-            FileInputStream fin = new FileInputStream(tempFile);
-            IOHelper.copyAndCloseInput(fin, out);
-        }
-    }
-    
-    
-    public void writeCacheTo(StringBuilder out, int limit) throws IOException {
-        flush();
-        if (totalLength < limit
-            || limit == -1) {
-            writeCacheTo(out);
-            return;
-        }
-        
-        int count = 0;
-        if (inMemory) {
-            if (currentStream instanceof ByteArrayOutputStream) {
-                byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
-                out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
-            } else {
-                throw new IOException("Unknown format of currentStream: " + currentStream);
-            }
-        } else {
-            // read the file
-            FileInputStream fin = new FileInputStream(tempFile);
-            byte bytes[] = new byte[1024];
-            int x = fin.read(bytes);
-            while (x != -1) {
-                if ((count + x) > limit) {
-                    x = limit - count;
-                }
-                out.append(IOHelper.newStringFromBytes(bytes, 0, x));
-                count += x;
-                
-                if (count >= limit) {
-                    x = -1;
-                } else {
-                    x = fin.read(bytes);
-                }
-            }
-            fin.close();
-        }
-    }
-    public void writeCacheTo(StringBuilder out) throws IOException {
-        flush();
-        if (inMemory) {
-            if (currentStream instanceof ByteArrayOutputStream) {
-                byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
-                out.append(IOHelper.newStringFromBytes(bytes));
-            } else {
-                throw new IOException("Unknown format of currentStream: " + currentStream);
-            }
-        } else {
-            // read the file
-            FileInputStream fin = new FileInputStream(tempFile);
-            byte bytes[] = new byte[1024];
-            int x = fin.read(bytes);
-            while (x != -1) {
-                out.append(IOHelper.newStringFromBytes(bytes, 0, x));
-                x = fin.read(bytes);
-            }
-            fin.close();
-        }
-    }    
-    
-
-    /**
-     * @return the underlying output stream
-     */
-    public OutputStream getOut() {
-        return currentStream;
-    }
-
     public int hashCode() {
         return currentStream.hashCode();
     }
 
     public String toString() {
-        StringBuilder builder = new StringBuilder().append("[")
-            .append(CachedOutputStream.class.getName())
-            .append(" Content: ");
-        try {
-            writeCacheTo(builder);
-        } catch (IOException e) {
-            //ignore
-        }
-        return builder.append("]").toString();
-    }
-
-    protected void onWrite() throws IOException {
+        return "CachedOutputStream[size: " + totalLength + "]";
     }
 
     public void write(byte[] b, int off, int len) throws IOException {
-        if (!outputLocked) {
-            onWrite();
-            this.totalLength += len;
-            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
-                createFileOutputStream();
-            }
-            currentStream.write(b, off, len);
+        this.totalLength += len;
+        if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            pageToFileStream();
         }
+        currentStream.write(b, off, len);
+        flush();
     }
 
     public void write(byte[] b) throws IOException {
-        if (!outputLocked) {
-            onWrite();
-            this.totalLength += b.length;
-            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
-                createFileOutputStream();
-            }
-            currentStream.write(b);
+        this.totalLength += b.length;
+        if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            pageToFileStream();
         }
+        currentStream.write(b);
+        flush();
     }
 
     public void write(int b) throws IOException {
-        if (!outputLocked) {
-            onWrite();
-            this.totalLength++;
-            if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
-                createFileOutputStream();
-            }
-            currentStream.write(b);
-        }
-    }
-
-    private void createFileOutputStream() throws IOException {
-        ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
-        if (outputDir == null) {
-            tempFile = FileUtil.createTempFile("cos", "tmp");
-        } else {
-            tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
+        this.totalLength++;
+        if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
+            pageToFileStream();
         }
-        
-        currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
-        bout.writeTo(currentStream);
-        inMemory = false;
-        streamList.add(currentStream);
-    }
-
-    public File getTempFile() {
-        return tempFile != null && tempFile.exists() ? tempFile : null;
+        currentStream.write(b);
+        flush();
     }
 
     public InputStream getInputStream() throws IOException {
-        flush();
         if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
             } else {
-                return null;
+                throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
             }
         } else {
             try {
-                FileInputStream fileInputStream = new FileInputStream(tempFile) {
-                    public void close() throws IOException {
-                        super.close();
-                        maybeDeleteTempFile(this);
-                    }
-                };
-                streamList.add(fileInputStream);
-                return fileInputStream;
+                return new FileInputStreamCache(tempFile, this);
             } catch (FileNotFoundException e) {
-                throw IOHelper.createIOException("Cached file was already deleted", e);
+                throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
             }
         }
     }
-    
+
+
     public StreamCache getStreamCache() throws IOException {
-        flush();
         if (inMemory) {
             if (currentStream instanceof ByteArrayOutputStream) {
                 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
             } else {
-                return null;
+                throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
             }
         } else {
             try {
                 return new FileInputStreamCache(tempFile, this);
             } catch (FileNotFoundException e) {
-                throw IOHelper.createIOException("Cached file was already deleted", e);
+                throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
             }
         }
     }
-    
-    private void maybeDeleteTempFile(Object stream) {        
-        streamList.remove(stream);        
-        if (!inMemory && tempFile != null && streamList.isEmpty()) {
-            tempFile.delete();
-            tempFile = null;
-            currentStream = new ByteArrayOutputStream(1024);
-            inMemory = true;
+
+    private void pageToFileStream() throws IOException {
+        ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
+        if (outputDir == null) {
+            tempFile = FileUtil.createTempFile("cos", ".tmp");
+        } else {
+            tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
         }
-    }
 
-    public void setOutputDir(File outputDir) throws IOException {
-        this.outputDir = outputDir;
-    }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Creating temporary stream cache file: " + tempFile);
+        }
 
-    public void setThreshold(long threshold) {
-        this.threshold = threshold;
+        try {
+            currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
+            bout.writeTo(currentStream);
+        } finally {
+            // ensure flag is flipped to file based
+            inMemory = false;
+        }
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Sun Jul 26 11:29:54 2009
@@ -32,9 +32,6 @@
     private CachedOutputStream cachedOutputStream;
     private File file;
 
-    public FileInputStreamCache() {
-    }
-
     public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
         this.file = file;
         this.cachedOutputStream = cos;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Sun Jul 26 11:29:54 2009
@@ -64,8 +64,7 @@
 
     @Converter
     public StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws IOException {
-        // set up CachedOutputStream with the properties
-        CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+        CachedOutputStream cos = new CachedOutputStream(exchange);
         IOHelper.copyAndCloseInput(stream, cos);
         return cos.getStreamCache();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamSourceCache.java Sun Jul 26 11:29:54 2009
@@ -39,7 +39,7 @@
     public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException {
         if (source.getInputStream() != null) {
             // set up CachedOutputStream with the properties
-            CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+            CachedOutputStream cos = new CachedOutputStream(exchange);
             IOHelper.copyAndCloseInput(source.getInputStream(), cos);
             streamCache = cos.getStreamCache();
             readCache = null;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java Sun Jul 26 11:29:54 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Locale;
+import java.util.Random;
 import java.util.Stack;
 
 /**
@@ -44,53 +45,9 @@
         return path;
     }
     
-    private static synchronized File getDefaultTempDir() {
-        if (defaultTempDir != null
-            && defaultTempDir.exists()) {
-            return defaultTempDir;
-        }
-        
-        String s = null;
-        try {
-            s = System.getProperty(FileUtil.class.getName() + ".TempDirectory");
-        } catch (SecurityException e) {
-            //Ignorable, we'll use the default
-        }
-        if (s == null) {
-            int x = (int)(Math.random() * 1000000);
-            s = System.getProperty("java.io.tmpdir");
-            File checkExists = new File(s);
-            if (!checkExists.exists()) {
-                throw new RuntimeException("The directory " 
-                                       + checkExists.getAbsolutePath() 
-                                       + " does not exist, please set java.io.tempdir"
-                                       + " to an existing directory");
-            }
-            File f = new File(s, "camel-tmp-" + x);
-            while (!f.mkdir()) {
-                x = (int)(Math.random() * 1000000);
-                f = new File(s, "camel-tmp-" + x);
-            }
-            defaultTempDir = f;
-            Thread hook = new Thread() {
-                @Override
-                public void run() {
-                    removeDir(defaultTempDir);
-                }
-            };
-            Runtime.getRuntime().addShutdownHook(hook);            
-        } else {
-            //assume someone outside of us will manage the directory
-            File f = new File(s);
-            f.mkdirs();
-            defaultTempDir = f;
-        }
-        return defaultTempDir;
-    }
-
     public static void mkDir(File dir) {
         if (dir == null) {
-            throw new RuntimeException("dir attribute is required");
+            throw new IllegalArgumentException("dir attribute is required");
         }
 
         if (dir.isFile()) {
@@ -130,8 +87,7 @@
         if (list == null) {
             list = new String[0];
         }
-        for (int i = 0; i < list.length; i++) {
-            String s = list[i];
+        for (String s : list) {
             File f = new File(d, s);
             if (f.isDirectory()) {
                 removeDir(f);
@@ -164,15 +120,11 @@
     }
 
     public static File createTempFile(String prefix, String suffix) throws IOException {
-        return createTempFile(prefix, suffix, null, false);
+        return createTempFile(prefix, suffix, null);
     }
-    
-    public static File createTempFile(String prefix, String suffix, File parentDir,
-                               boolean deleteOnExit) throws IOException {
-        File result = null;
-        File parent = (parentDir == null)
-            ? getDefaultTempDir()
-            : parentDir;
+
+    public static File createTempFile(String prefix, String suffix, File parentDir) throws IOException {
+        File parent = (parentDir == null) ? getDefaultTempDir() : parentDir;
             
         if (suffix == null) {
             suffix = ".tmp";
@@ -182,15 +134,11 @@
         } else if (prefix.length() < 3) {
             prefix = prefix + "camel";
         }
-        result = File.createTempFile(prefix, suffix, parent);
 
-        //if parentDir is null, we're in our default dir
-        //which will get completely wiped on exit from our exit
-        //hook.  No need to set deleteOnExit() which leaks memory.
-        if (deleteOnExit && parentDir != null) {
-            result.deleteOnExit();
-        }
-        return result;
+        // create parent folder
+        parent.mkdirs();
+
+        return File.createTempFile(prefix, suffix, parent);
     }
 
     /**
@@ -286,4 +234,42 @@
         return sb.toString();
     }
 
+    private static synchronized File getDefaultTempDir() {
+        if (defaultTempDir != null && defaultTempDir.exists()) {
+            return defaultTempDir;
+        }
+
+        String s = System.getProperty("java.io.tmpdir");
+        File checkExists = new File(s);
+        if (!checkExists.exists()) {
+            throw new RuntimeException("The directory "
+                                   + checkExists.getAbsolutePath()
+                                   + " does not exist, please set java.io.tempdir"
+                                   + " to an existing directory");
+        }
+
+        // why do we create another tmp folder
+        Random ran = new Random();
+        int x = ran.nextInt(1000000);
+
+        File f = new File(s, "camel-tmp-" + x);
+        while (!f.mkdir()) {
+            x = ran.nextInt(1000000);
+            f = new File(s, "camel-tmp-" + x);
+        }
+
+        defaultTempDir = f;
+
+        // create shutdown hook to remove the temp dir
+        Thread hook = new Thread() {
+            @Override
+            public void run() {
+                removeDir(defaultTempDir);
+            }
+        };
+        Runtime.getRuntime().addShutdownHook(hook);
+
+        return defaultTempDir;
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java Sun Jul 26 11:29:54 2009
@@ -22,28 +22,35 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
-import junit.framework.TestCase;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
 import org.apache.camel.converter.IOConverter;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultUnitOfWork;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.util.CollectionStringBuffer;
 
-public class CachedOutputStreamTest extends TestCase {
+public class CachedOutputStreamTest extends ContextTestSupport {
     private static final String TEST_STRING = "This is a test string and it has enough" 
         + " aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ";
-    
-    private File file = new File("./target/cacheFile");
 
-    private static void deleteDirectory(File file) {
-        if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            for (int i = 0; i < files.length; i++) {
-                deleteDirectory(files[i]);
-            }
-        }
-        file.delete();
+    private Exchange exchange;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        context.getProperties().put(CachedOutputStream.TEMP_DIR, "./target/cachedir");
+        context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+        deleteDirectory("./target/cachedir");
+        createDirectory("./target/cachedir");
+
+        exchange = new DefaultExchange(context);
+        UnitOfWork uow = new DefaultUnitOfWork(exchange);
+        exchange.setUnitOfWork(uow);
     }
-    
-    private static String toString(InputStream input) throws IOException {        
+
+    private static String toString(InputStream input) throws IOException {
         BufferedReader reader = new BufferedReader(new InputStreamReader(input));
         CollectionStringBuffer builder = new CollectionStringBuffer("\n");
         while (true) {
@@ -54,18 +61,12 @@
             builder.append(line);
         }
     }
-    
-    protected void setUp() throws Exception {        
-        if (file.exists()) {
-            deleteDirectory(file);
-        }
-        file.mkdirs();
-    }
-       
+
     public void testCacheStreamToFileAndCloseStream() throws IOException {       
-        CachedOutputStream cos = new CachedOutputStream(16);
-        cos.setOutputDir(file);
-        cos.write(TEST_STRING.getBytes("UTF-8"));        
+        CachedOutputStream cos = new CachedOutputStream(exchange);
+        cos.write(TEST_STRING.getBytes("UTF-8"));
+
+        File file = new File("./target/cachedir");
         String[] files = file.list();
         assertEquals("we should have a temp file", files.length, 1);
         assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
@@ -73,8 +74,12 @@
         StreamCache cache = cos.getStreamCache();
         assertTrue("Should get the FileInputStreamCache", cache instanceof FileInputStreamCache);
         String temp = toString((InputStream)cache);
+
         ((InputStream)cache).close();
         assertEquals("Cached a wrong file", temp, TEST_STRING);
+
+        exchange.getUnitOfWork().done(exchange);
+
         try {
             cache.reset();
             // The stream is closed, so the temp file is gone.
@@ -82,14 +87,17 @@
         } catch (Exception exception) {
             // do nothing
         }
+
+
         files = file.list();
         assertEquals("we should have no temp file", files.length, 0);
     }
     
-    public void testCacheStreamToFileAndNotCloseStream() throws IOException {       
-        CachedOutputStream cos = new CachedOutputStream(16);
-        cos.setOutputDir(file);
-        cos.write(TEST_STRING.getBytes("UTF-8"));        
+    public void testCacheStreamToFileAndNotCloseStream() throws IOException {
+        CachedOutputStream cos = new CachedOutputStream(exchange);
+        cos.write(TEST_STRING.getBytes("UTF-8"));
+
+        File file = new File("./target/cachedir");
         String[] files = file.list();
         assertEquals("we should have a temp file", files.length, 1);
         assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
@@ -102,20 +110,28 @@
         temp = toString((InputStream)cache);
         assertEquals("Cached a wrong file", temp, TEST_STRING);
         
+        exchange.getUnitOfWork().done(exchange);
+
         ((InputStream)cache).close();
         files = file.list();
         assertEquals("we should have no temp file", files.length, 0);       
     }
     
     public void testCacheStreamToMemory() throws IOException {
-        CachedOutputStream cos = new CachedOutputStream();
-        cos.setOutputDir(file);
-        cos.write(TEST_STRING.getBytes("UTF-8"));        
+        context.getProperties().put(CachedOutputStream.THRESHOLD, "1024");
+
+        CachedOutputStream cos = new CachedOutputStream(exchange);
+        cos.write(TEST_STRING.getBytes("UTF-8"));
+
+        File file = new File("./target/cachedir");
         String[] files = file.list();
+
         assertEquals("we should have no temp file", files.length, 0);
         StreamCache cache = cos.getStreamCache();
         assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
         String temp = IOConverter.toString((InputStream)cache);
         assertEquals("Cached a wrong file", temp, TEST_STRING);
+
+        exchange.getUnitOfWork().done(exchange);
     }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java Sun Jul 26 11:29:54 2009
@@ -76,6 +76,7 @@
             // since the stream is closed you delete the temp file
             // reset will not work any more
             cache.reset();
+            exchange.getUnitOfWork().done(exchange);
             fail("except the exception here");
         } catch (Exception exception) {
             // do nothing

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Sun Jul 26 11:29:54 2009
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.issues;
 
 import java.io.ByteArrayInputStream;

Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=797907&r1=797906&r2=797907&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Sun Jul 26 11:29:54 2009
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.io.ByteArrayInputStream;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -119,20 +120,27 @@
         HttpOperationFailedException exception;
         Header[] headers = method.getResponseHeaders();
         InputStream is = extractResponseBody(method, exchange);
+        // make a defensive copy of the response body in the exception so its detached from the cache
+        InputStream copy = null;
+        if (is != null) {
+            copy = new ByteArrayInputStream(exchange.getContext().getTypeConverter().convertTo(byte[].class, is));
+        }
+
         if (responseCode >= 300 && responseCode < 400) {
             String redirectLocation;
             Header locationHeader = method.getResponseHeader("location");
             if (locationHeader != null) {
                 redirectLocation = locationHeader.getValue();
-                exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), redirectLocation, headers, is);
+                exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), redirectLocation, headers, copy);
             } else {
                 // no redirect location
-                exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, is);
+                exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, copy);
             }
         } else {
             // internal server error (error code 500)
-            exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, is);
+            exception = new HttpOperationFailedException(responseCode, method.getStatusLine(), headers, copy);
         }
+
         return exception;
     }
 
@@ -169,7 +177,7 @@
 
     private static InputStream doExtractResponseBody(InputStream is, Exchange exchange) throws IOException {
         try {
-            CachedOutputStream cos = new CachedOutputStream(exchange.getContext().getProperties());
+            CachedOutputStream cos = new CachedOutputStream(exchange);
             IOHelper.copy(is, cos);
             return cos.getInputStream();
         } finally {

Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java?rev=797907&view=auto
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java (added)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java Sun Jul 26 11:29:54 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.File;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.http.HttpOperationFailedException;
+import org.apache.camel.converter.stream.CachedOutputStream;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class HttpStreamCacheFileTest extends CamelTestSupport {
+
+    private String body = "12345678901234567890123456789012345678901234567890";
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        deleteDirectory("./target/cachedir");
+        createDirectory("./target/cachedir");
+        super.setUp();
+    }
+
+    @Test
+    public void testStreamCacheToFileShouldBeDeletedInCaseOfResponse() throws Exception {
+        String out = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        // the temporary files should have been deleted
+        File file = new File("./target/cachedir");
+        String[] files = file.list();
+        assertEquals("There should be no files", files.length, 0);
+    }
+
+    @Test
+    public void testStreamCacheToFileShouldBeDeletedInCaseOfException() throws Exception {
+        try {
+            template.requestBody("direct:start", null, String.class);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            HttpOperationFailedException hofe = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
+            String s = context.getTypeConverter().convertTo(String.class, hofe.getResponseBody());
+            assertEquals("Response body", body, s);
+        }
+
+        // the temporary files should have been deleted
+        File file = new File("./target/cachedir");
+        String[] files = file.list();
+        assertEquals("There should be no files", files.length, 0);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // enable stream caching and use a low threshold so its forced to write to file
+                context.getProperties().put(CachedOutputStream.TEMP_DIR, "./target/cachedir");
+                context.getProperties().put(CachedOutputStream.THRESHOLD, "16");
+                context.setStreamCaching(true);
+
+                // use a route so we got an unit of work
+                from("direct:start").to("http://localhost:8123/myserver");
+
+                from("jetty://http://localhost:8123/myserver")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                if (exchange.getIn().getBody() == null) {
+                                    exchange.getOut().setBody("12345678901234567890123456789012345678901234567890");
+                                    exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
+                                } else {
+                                    exchange.getOut().setBody("Bye World");
+                                }
+                            }
+                        });
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/HttpStreamCacheFileTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date