You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2010/07/14 12:07:34 UTC

svn commit: r963998 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java

Author: ningjiang
Date: Wed Jul 14 10:07:34 2010
New Revision: 963998

URL: http://svn.apache.org/viewvc?rev=963998&view=rev
Log:
CAMEL-2947 CachedOutputStream supports not to close itself when the exchange is completed

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
    camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=963998&r1=963997&r2=963998&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Jul 14 10:07:34 2010
@@ -56,8 +56,12 @@ public class CachedOutputStream extends 
 
     private long threshold = 64 * 1024;
     private File outputDir;
-
+    
     public CachedOutputStream(Exchange exchange) {
+        this(exchange, true);
+    }
+
+    public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
         String hold = exchange.getContext().getProperties().get(THRESHOLD);
         String dir = exchange.getContext().getProperties().get(TEMP_DIR);
         if (hold != null) {
@@ -67,25 +71,27 @@ public class CachedOutputStream extends 
             this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
         }
         
-        // add on completion so we can cleanup after the exchange is done such as deleting temporary files
-        exchange.addOnCompletion(new SynchronizationAdapter() {
-            @Override
-            public void onDone(Exchange exchange) {
-                try {
-                    if (fileInputStreamCache != null) {
-                        fileInputStreamCache.close();
+        if (closedOnCompletion) {
+            // add on completion so we can cleanup after the exchange is done such as deleting temporary files
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onDone(Exchange exchange) {
+                    try {
+                        if (fileInputStreamCache != null) {
+                            fileInputStreamCache.close();
+                        }
+                        close();
+                    } catch (Exception e) {
+                        LOG.warn("Error deleting temporary cache file: " + tempFile, e);
                     }
-                    close();
-                } catch (Exception e) {
-                    LOG.warn("Error deleting temporary cache file: " + tempFile, e);
                 }
-            }
-
-            @Override
-            public String toString() {
-                return "OnCompletion[CachedOutputStream]";
-            }
-        });
+    
+                @Override
+                public String toString() {
+                    return "OnCompletion[CachedOutputStream]";
+                }
+            });
+        }
     }
 
     public void flush() throws IOException {
@@ -152,6 +158,11 @@ public class CachedOutputStream extends 
                 throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
             }
         }
+    }    
+    
+    public InputStream getWrappedInputStream() throws IOException {
+        // The WrappedInputStream will close the CachedOuputStream when it is closed
+        return new WrappedInputStream(this, getInputStream());
     }
 
 
@@ -206,5 +217,37 @@ public class CachedOutputStream extends 
             inMemory = false;
         }
     }
+    
+    // This class will close the CachedOutputStream when it is closed
+    private class WrappedInputStream extends InputStream {
+        private CachedOutputStream cachedOutputStream;
+        private InputStream inputStream;
+        
+        WrappedInputStream(CachedOutputStream cos, InputStream is) {
+            cachedOutputStream = cos;
+            inputStream = is;
+        }
+        
+        @Override
+        public int read() throws IOException {
+            return inputStream.read();
+        }
+        
+        @Override
+        public int available() throws IOException {
+            return inputStream.available();
+        }
+        
+        @Override
+        public void reset() throws IOException {
+            inputStream.reset();
+        }
+        
+        @Override
+        public void close() throws IOException {
+            inputStream.close();
+            cachedOutputStream.close();
+        }
+    }
 
 }

Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=963998&r1=963997&r2=963998&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Wed Jul 14 10:07:34 2010
@@ -228,9 +228,11 @@ public class HttpProducer extends Defaul
         // As httpclient is using a AutoCloseInputStream, it will be closed when the connection is closed
         // we need to cache the stream for it.
         try {
-            CachedOutputStream cos = new CachedOutputStream(exchange);
+            // This CachedOutputStream will not be closed when the exchange is onCompletion
+            CachedOutputStream cos = new CachedOutputStream(exchange, false);
             IOHelper.copy(is, cos);
-            return cos.getInputStream();
+            // When the InputStream is closed, the CachedOutputStream will be closed
+            return cos.getWrappedInputStream();
         } finally {
             IOHelper.close(is, "Extracting response body", LOG);            
         }