You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/14 10:09:59 UTC

svn commit: r963966 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/impl/converter/ test/java/org/apache/camel/converter/stream/ test/java/org/apache/camel/management/ test/resources/

Author: davsclaus
Date: Wed Jul 14 08:09:58 2010
New Revision: 963966

URL: http://svn.apache.org/viewvc?rev=963966&view=rev
Log:
CAMEL-2944: Fixed issue with stream cache spooled to files on Windows and deleting the temporary files when Exchange done

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Jul 14 08:09:58 2010
@@ -25,8 +25,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.StreamCache;
@@ -54,9 +52,7 @@ public class CachedOutputStream extends 
     private boolean inMemory = true;
     private int totalLength;
     private File tempFile;
-    private boolean exchangeOnCompleted;
-    
-    private List<FileInputStreamCache> fileInputStreamCaches = new ArrayList<FileInputStreamCache>(4);
+    private FileInputStreamCache fileInputStreamCache;
 
     private long threshold = 64 * 1024;
     private File outputDir;
@@ -76,12 +72,10 @@ public class CachedOutputStream extends 
             @Override
             public void onDone(Exchange exchange) {
                 try {
-                    //set the flag so we can delete the temp file 
-                    exchangeOnCompleted = true;
-                    if (fileInputStreamCaches.size() == 0) {
-                        // there is no open fileInputStream let's close it 
-                        close();
+                    if (fileInputStreamCache != null) {
+                        fileInputStreamCache.close();
                     }
+                    close();
                 } catch (Exception e) {
                     LOG.warn("Error deleting temporary cache file: " + tempFile, e);
                 }
@@ -150,9 +144,10 @@ public class CachedOutputStream extends 
             }
         } else {
             try {
-                FileInputStreamCache answer = new FileInputStreamCache(tempFile, this);
-                fileInputStreamCaches.add(answer);
-                return answer;
+                if (fileInputStreamCache == null) {
+                    fileInputStreamCache = new FileInputStreamCache(tempFile);
+                }
+                return fileInputStreamCache;
             } catch (FileNotFoundException e) {
                 throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
             }
@@ -171,23 +166,16 @@ public class CachedOutputStream extends 
             }
         } else {
             try {
-                FileInputStreamCache answer = new FileInputStreamCache(tempFile, this);
-                fileInputStreamCaches.add(answer);
-                return answer;
+                if (fileInputStreamCache == null) {
+                    fileInputStreamCache = new FileInputStreamCache(tempFile);
+                }
+                return fileInputStreamCache;
             } catch (FileNotFoundException e) {
                 throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
             }
         }
     }
-    
-    public void releaseFileInputStream(FileInputStreamCache fileInputStreamCache) throws IOException {
-        fileInputStreamCaches.remove(fileInputStreamCache);
-        if (exchangeOnCompleted && fileInputStreamCaches.size() == 0) {
-            // now we can close stream and delete the temp file
-            close();
-        }
-    }
-    
+
     private void cleanUpTempFile() {
         // cleanup temporary file
         if (tempFile != null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Wed Jul 14 08:09:58 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.converter.stream;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -27,42 +28,31 @@ import org.apache.camel.RuntimeCamelExce
 import org.apache.camel.StreamCache;
 import org.apache.camel.util.IOHelper;
 
-public class FileInputStreamCache extends InputStream implements StreamCache {
+public class FileInputStreamCache extends InputStream implements StreamCache, Closeable {
     private InputStream stream;
-    private CachedOutputStream cachedOutputStream;
     private File file;
 
-    public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
+    public FileInputStreamCache(File file) throws FileNotFoundException {
         this.file = file;
-        this.cachedOutputStream = cos;
         this.stream = new FileInputStream(file);
     }
     
     @Override
     public void close() {
-        try {
-            if (isSteamOpened()) {
-                getInputStream().close();
-            }
-            // Just remove the itself from cachedOutputStream
-            if (cachedOutputStream != null) {
-                cachedOutputStream.releaseFileInputStream(this);
-            }
-        } catch (Exception e) {
-            throw new RuntimeCamelException(e);
+        if (isSteamOpened()) {
+            IOHelper.close(getInputStream());
         }
     }
 
     @Override
     public void reset() {
         try {
-            if (isSteamOpened()) {
-                getInputStream().close();
-            }
+            // reset by closing and creating a new stream based on the file
+            close();
             // reset by creating a new stream based on the file
             stream = new FileInputStream(file);
         } catch (Exception e) {
-            throw new RuntimeCamelException(e);
+            throw new RuntimeCamelException("Cannot reset stream from file " + file, e);
         }            
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Wed Jul 14 08:09:58 2010
@@ -165,6 +165,9 @@ public class DefaultTypeConverter extend
         // try to find a suitable type converter
         TypeConverter converter = getOrFindTypeConverter(type, value);
         if (converter != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using converter: " + converter + " to convert " + key);
+            }
             Object rc = converter.convertTo(type, exchange, value);
             if (rc != null) {
                 return rc;

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java Wed Jul 14 08:09:58 2010
@@ -33,9 +33,8 @@ public class FileInputStreamCacheTest ex
     public void testFileInputStreamCache() throws Exception {
         Exchange exchange = new DefaultExchange(context);
 
-        CachedOutputStream cos = new CachedOutputStream(exchange);
         File file = new File(TEST_FILE).getAbsoluteFile();
-        FileInputStreamCache cache = new FileInputStreamCache(file, cos);
+        FileInputStreamCache cache = new FileInputStreamCache(file);
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         cache.writeTo(bos);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java Wed Jul 14 08:09:58 2010
@@ -89,8 +89,6 @@ public class EventNotifierRedeliveryEven
         template.sendBody("direct:start", "Hello World");
         assertMockEndpointsSatisfied();
 
-        Thread.sleep(1000);
-
         assertEquals(9, events.size());
 
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
@@ -112,7 +110,7 @@ public class EventNotifierRedeliveryEven
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(25));
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(100));
 
                 from("direct:start").throwException(new IllegalArgumentException("Damn"));
             }
@@ -123,6 +121,8 @@ public class EventNotifierRedeliveryEven
         template.sendBody("direct:start", "Hello World");
         assertMockEndpointsSatisfied();
 
+        Thread.sleep(500);
+
         assertEquals(9, events.size());
 
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
@@ -135,9 +135,8 @@ public class EventNotifierRedeliveryEven
         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
         assertEquals(4, e.getAttempt());
         assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
+
+        // since its async the ordering of the rest can be different depending per OS and timing
     }
 
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=963966&r1=963965&r2=963966&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Wed Jul 14 08:09:58 2010
@@ -40,6 +40,8 @@ log4j.logger.org.apache.camel.impl.conve
 log4j.logger.org.apache.camel.management=WARN
 log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.impl=TRACE
+#log4j.logger.org.apache.camel.util.FileUtil=TRACE
+#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender



Re: svn commit: r963966 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/impl/converter/ test/java/org/apache/camel/converter/stream/ test/java/org/apache/camel/management/ test/resources/

Posted by Willem Jiang <wi...@gmail.com>.
Hi Claus,

Don't worry, I just find a way to work around this issue.
Will commit a quick fix for it shortly.

Willem



Claus Ibsen wrote:
> On Wed, Jul 14, 2010 at 11:08 AM, Willem Jiang <wi...@gmail.com> wrote:
>> Hi Claus,
>>
>> You change broke the test of JettyHttpFileCacheTest[1].
>> For this http route, the input stream of the response should not be closed
>> even the exchange is done, as it will be used for sending the response for
>> the route point.
>>
>> Here is the JIRA[1] why we added this test.
>>
>> [1]https://svn.apache.org/repos/asf/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/issues/JettyHttpFileCacheTest.java
>> [2]https://issues.apache.org/activemq/browse/CAMEL-2776
>>
> 
> Thanks I will look into it later today.
> 
> 
> 
>> Willem
>>

Re: svn commit: r963966 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/impl/converter/ test/java/org/apache/camel/converter/stream/ test/java/org/apache/camel/management/ test/resources/

Posted by Claus Ibsen <cl...@gmail.com>.
On Wed, Jul 14, 2010 at 11:08 AM, Willem Jiang <wi...@gmail.com> wrote:
> Hi Claus,
>
> You change broke the test of JettyHttpFileCacheTest[1].
> For this http route, the input stream of the response should not be closed
> even the exchange is done, as it will be used for sending the response for
> the route point.
>
> Here is the JIRA[1] why we added this test.
>
> [1]https://svn.apache.org/repos/asf/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/issues/JettyHttpFileCacheTest.java
> [2]https://issues.apache.org/activemq/browse/CAMEL-2776
>

Thanks I will look into it later today.



> Willem
>
> davsclaus@apache.org wrote:
>>
>> Author: davsclaus
>> Date: Wed Jul 14 08:09:58 2010
>> New Revision: 963966
>>
>> URL: http://svn.apache.org/viewvc?rev=963966&view=rev
>> Log:
>> CAMEL-2944: Fixed issue with stream cache spooled to files on Windows and
>> deleting the temporary files when Exchange done
>>
>> Modified:
>>
>>  camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
>>
>>  camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
>>
>>  camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
>>
>>  camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
>>
>>  camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
>>    camel/trunk/camel-core/src/test/resources/log4j.properties
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
>> Wed Jul 14 08:09:58 2010
>> @@ -25,8 +25,6 @@ import java.io.FileOutputStream;
>>  import java.io.IOException;
>>  import java.io.InputStream;
>>  import java.io.OutputStream;
>> -import java.util.ArrayList;
>> -import java.util.List;
>>  import org.apache.camel.Exchange;
>>  import org.apache.camel.StreamCache;
>> @@ -54,9 +52,7 @@ public class CachedOutputStream extends      private
>> boolean inMemory = true;
>>     private int totalLength;
>>     private File tempFile;
>> -    private boolean exchangeOnCompleted;
>> -    -    private List<FileInputStreamCache> fileInputStreamCaches = new
>> ArrayList<FileInputStreamCache>(4);
>> +    private FileInputStreamCache fileInputStreamCache;
>>      private long threshold = 64 * 1024;
>>     private File outputDir;
>> @@ -76,12 +72,10 @@ public class CachedOutputStream extends
>>  @Override
>>             public void onDone(Exchange exchange) {
>>                 try {
>> -                    //set the flag so we can delete the temp file -
>>              exchangeOnCompleted = true;
>> -                    if (fileInputStreamCaches.size() == 0) {
>> -                        // there is no open fileInputStream let's close
>> it -                        close();
>> +                    if (fileInputStreamCache != null) {
>> +                        fileInputStreamCache.close();
>>                     }
>> +                    close();
>>                 } catch (Exception e) {
>>                     LOG.warn("Error deleting temporary cache file: " +
>> tempFile, e);
>>                 }
>> @@ -150,9 +144,10 @@ public class CachedOutputStream extends
>>  }
>>         } else {
>>             try {
>> -                FileInputStreamCache answer = new
>> FileInputStreamCache(tempFile, this);
>> -                fileInputStreamCaches.add(answer);
>> -                return answer;
>> +                if (fileInputStreamCache == null) {
>> +                    fileInputStreamCache = new
>> FileInputStreamCache(tempFile);
>> +                }
>> +                return fileInputStreamCache;
>>             } catch (FileNotFoundException e) {
>>                 throw IOHelper.createIOException("Cached file " + tempFile
>> + " not found", e);
>>             }
>> @@ -171,23 +166,16 @@ public class CachedOutputStream extends
>>  }
>>         } else {
>>             try {
>> -                FileInputStreamCache answer = new
>> FileInputStreamCache(tempFile, this);
>> -                fileInputStreamCaches.add(answer);
>> -                return answer;
>> +                if (fileInputStreamCache == null) {
>> +                    fileInputStreamCache = new
>> FileInputStreamCache(tempFile);
>> +                }
>> +                return fileInputStreamCache;
>>             } catch (FileNotFoundException e) {
>>                 throw IOHelper.createIOException("Cached file " + tempFile
>> + " not found", e);
>>             }
>>         }
>>     }
>> -    -    public void releaseFileInputStream(FileInputStreamCache
>> fileInputStreamCache) throws IOException {
>> -        fileInputStreamCaches.remove(fileInputStreamCache);
>> -        if (exchangeOnCompleted && fileInputStreamCaches.size() == 0) {
>> -            // now we can close stream and delete the temp file
>> -            close();
>> -        }
>> -    }
>> -    +
>>     private void cleanUpTempFile() {
>>         // cleanup temporary file
>>         if (tempFile != null) {
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
>> Wed Jul 14 08:09:58 2010
>> @@ -16,6 +16,7 @@
>>  */
>>  package org.apache.camel.converter.stream;
>>  +import java.io.Closeable;
>>  import java.io.File;
>>  import java.io.FileInputStream;
>>  import java.io.FileNotFoundException;
>> @@ -27,42 +28,31 @@ import org.apache.camel.RuntimeCamelExce
>>  import org.apache.camel.StreamCache;
>>  import org.apache.camel.util.IOHelper;
>>  -public class FileInputStreamCache extends InputStream implements
>> StreamCache {
>> +public class FileInputStreamCache extends InputStream implements
>> StreamCache, Closeable {
>>     private InputStream stream;
>> -    private CachedOutputStream cachedOutputStream;
>>     private File file;
>>  -    public FileInputStreamCache(File file, CachedOutputStream cos)
>> throws FileNotFoundException {
>> +    public FileInputStreamCache(File file) throws FileNotFoundException {
>>         this.file = file;
>> -        this.cachedOutputStream = cos;
>>         this.stream = new FileInputStream(file);
>>     }
>>          @Override
>>     public void close() {
>> -        try {
>> -            if (isSteamOpened()) {
>> -                getInputStream().close();
>> -            }
>> -            // Just remove the itself from cachedOutputStream
>> -            if (cachedOutputStream != null) {
>> -                cachedOutputStream.releaseFileInputStream(this);
>> -            }
>> -        } catch (Exception e) {
>> -            throw new RuntimeCamelException(e);
>> +        if (isSteamOpened()) {
>> +            IOHelper.close(getInputStream());
>>         }
>>     }
>>      @Override
>>     public void reset() {
>>         try {
>> -            if (isSteamOpened()) {
>> -                getInputStream().close();
>> -            }
>> +            // reset by closing and creating a new stream based on the
>> file
>> +            close();
>>             // reset by creating a new stream based on the file
>>             stream = new FileInputStream(file);
>>         } catch (Exception e) {
>> -            throw new RuntimeCamelException(e);
>> +            throw new RuntimeCamelException("Cannot reset stream from
>> file " + file, e);
>>         }                 }
>>
>> Modified:
>> camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
>> Wed Jul 14 08:09:58 2010
>> @@ -165,6 +165,9 @@ public class DefaultTypeConverter extend
>>         // try to find a suitable type converter
>>         TypeConverter converter = getOrFindTypeConverter(type, value);
>>         if (converter != null) {
>> +            if (LOG.isTraceEnabled()) {
>> +                LOG.trace("Using converter: " + converter + " to convert
>> " + key);
>> +            }
>>             Object rc = converter.convertTo(type, exchange, value);
>>             if (rc != null) {
>>                 return rc;
>>
>> Modified:
>> camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
>> Wed Jul 14 08:09:58 2010
>> @@ -33,9 +33,8 @@ public class FileInputStreamCacheTest ex
>>     public void testFileInputStreamCache() throws Exception {
>>         Exchange exchange = new DefaultExchange(context);
>>  -        CachedOutputStream cos = new CachedOutputStream(exchange);
>>         File file = new File(TEST_FILE).getAbsoluteFile();
>> -        FileInputStreamCache cache = new FileInputStreamCache(file, cos);
>> +        FileInputStreamCache cache = new FileInputStreamCache(file);
>>          ByteArrayOutputStream bos = new ByteArrayOutputStream();
>>         cache.writeTo(bos);
>>
>> Modified:
>> camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> ---
>> camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
>> (original)
>> +++
>> camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
>> Wed Jul 14 08:09:58 2010
>> @@ -89,8 +89,6 @@ public class EventNotifierRedeliveryEven
>>         template.sendBody("direct:start", "Hello World");
>>         assertMockEndpointsSatisfied();
>>  -        Thread.sleep(1000);
>> -
>>         assertEquals(9, events.size());
>>          assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
>> @@ -112,7 +110,7 @@ public class EventNotifierRedeliveryEven
>>         context.addRoutes(new RouteBuilder() {
>>             @Override
>>             public void configure() throws Exception {
>> -
>>  errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(25));
>> +
>>  errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(100));
>>                  from("direct:start").throwException(new
>> IllegalArgumentException("Damn"));
>>             }
>> @@ -123,6 +121,8 @@ public class EventNotifierRedeliveryEven
>>         template.sendBody("direct:start", "Hello World");
>>         assertMockEndpointsSatisfied();
>>  +        Thread.sleep(500);
>> +
>>         assertEquals(9, events.size());
>>          assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
>> @@ -135,9 +135,8 @@ public class EventNotifierRedeliveryEven
>>         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class,
>> events.get(4));
>>         assertEquals(4, e.getAttempt());
>>         assertIsInstanceOf(ExchangeFailureHandledEvent.class,
>> events.get(5));
>> -        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
>> -        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
>> -        assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
>> +
>> +        // since its async the ordering of the rest can be different
>> depending per OS and timing
>>     }
>>  }
>> \ No newline at end of file
>>
>> Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
>> URL:
>> http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=963966&r1=963965&r2=963966&view=diff
>>
>> ==============================================================================
>> --- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
>> +++ camel/trunk/camel-core/src/test/resources/log4j.properties Wed Jul 14
>> 08:09:58 2010
>> @@ -40,6 +40,8 @@ log4j.logger.org.apache.camel.impl.conve
>>  log4j.logger.org.apache.camel.management=WARN
>>  log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
>>  #log4j.logger.org.apache.camel.impl=TRACE
>> +#log4j.logger.org.apache.camel.util.FileUtil=TRACE
>> +#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
>>  # CONSOLE appender not used by default
>>  log4j.appender.out=org.apache.log4j.ConsoleAppender
>>
>>
>>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: svn commit: r963966 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/impl/converter/ test/java/org/apache/camel/converter/stream/ test/java/org/apache/camel/management/ test/resources/

Posted by Willem Jiang <wi...@gmail.com>.
Hi Claus,

You change broke the test of JettyHttpFileCacheTest[1].
For this http route, the input stream of the response should not be 
closed even the exchange is done, as it will be used for sending the 
response for the route point.

Here is the JIRA[1] why we added this test.

[1]https://svn.apache.org/repos/asf/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/issues/JettyHttpFileCacheTest.java
[2]https://issues.apache.org/activemq/browse/CAMEL-2776

Willem

davsclaus@apache.org wrote:
> Author: davsclaus
> Date: Wed Jul 14 08:09:58 2010
> New Revision: 963966
> 
> URL: http://svn.apache.org/viewvc?rev=963966&view=rev
> Log:
> CAMEL-2944: Fixed issue with stream cache spooled to files on Windows and deleting the temporary files when Exchange done
> 
> Modified:
>     camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
>     camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
>     camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
>     camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
>     camel/trunk/camel-core/src/test/resources/log4j.properties
> 
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java Wed Jul 14 08:09:58 2010
> @@ -25,8 +25,6 @@ import java.io.FileOutputStream;
>  import java.io.IOException;
>  import java.io.InputStream;
>  import java.io.OutputStream;
> -import java.util.ArrayList;
> -import java.util.List;
>  
>  import org.apache.camel.Exchange;
>  import org.apache.camel.StreamCache;
> @@ -54,9 +52,7 @@ public class CachedOutputStream extends 
>      private boolean inMemory = true;
>      private int totalLength;
>      private File tempFile;
> -    private boolean exchangeOnCompleted;
> -    
> -    private List<FileInputStreamCache> fileInputStreamCaches = new ArrayList<FileInputStreamCache>(4);
> +    private FileInputStreamCache fileInputStreamCache;
>  
>      private long threshold = 64 * 1024;
>      private File outputDir;
> @@ -76,12 +72,10 @@ public class CachedOutputStream extends 
>              @Override
>              public void onDone(Exchange exchange) {
>                  try {
> -                    //set the flag so we can delete the temp file 
> -                    exchangeOnCompleted = true;
> -                    if (fileInputStreamCaches.size() == 0) {
> -                        // there is no open fileInputStream let's close it 
> -                        close();
> +                    if (fileInputStreamCache != null) {
> +                        fileInputStreamCache.close();
>                      }
> +                    close();
>                  } catch (Exception e) {
>                      LOG.warn("Error deleting temporary cache file: " + tempFile, e);
>                  }
> @@ -150,9 +144,10 @@ public class CachedOutputStream extends 
>              }
>          } else {
>              try {
> -                FileInputStreamCache answer = new FileInputStreamCache(tempFile, this);
> -                fileInputStreamCaches.add(answer);
> -                return answer;
> +                if (fileInputStreamCache == null) {
> +                    fileInputStreamCache = new FileInputStreamCache(tempFile);
> +                }
> +                return fileInputStreamCache;
>              } catch (FileNotFoundException e) {
>                  throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
>              }
> @@ -171,23 +166,16 @@ public class CachedOutputStream extends 
>              }
>          } else {
>              try {
> -                FileInputStreamCache answer = new FileInputStreamCache(tempFile, this);
> -                fileInputStreamCaches.add(answer);
> -                return answer;
> +                if (fileInputStreamCache == null) {
> +                    fileInputStreamCache = new FileInputStreamCache(tempFile);
> +                }
> +                return fileInputStreamCache;
>              } catch (FileNotFoundException e) {
>                  throw IOHelper.createIOException("Cached file " + tempFile + " not found", e);
>              }
>          }
>      }
> -    
> -    public void releaseFileInputStream(FileInputStreamCache fileInputStreamCache) throws IOException {
> -        fileInputStreamCaches.remove(fileInputStreamCache);
> -        if (exchangeOnCompleted && fileInputStreamCaches.size() == 0) {
> -            // now we can close stream and delete the temp file
> -            close();
> -        }
> -    }
> -    
> +
>      private void cleanUpTempFile() {
>          // cleanup temporary file
>          if (tempFile != null) {
> 
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java Wed Jul 14 08:09:58 2010
> @@ -16,6 +16,7 @@
>   */
>  package org.apache.camel.converter.stream;
>  
> +import java.io.Closeable;
>  import java.io.File;
>  import java.io.FileInputStream;
>  import java.io.FileNotFoundException;
> @@ -27,42 +28,31 @@ import org.apache.camel.RuntimeCamelExce
>  import org.apache.camel.StreamCache;
>  import org.apache.camel.util.IOHelper;
>  
> -public class FileInputStreamCache extends InputStream implements StreamCache {
> +public class FileInputStreamCache extends InputStream implements StreamCache, Closeable {
>      private InputStream stream;
> -    private CachedOutputStream cachedOutputStream;
>      private File file;
>  
> -    public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException {
> +    public FileInputStreamCache(File file) throws FileNotFoundException {
>          this.file = file;
> -        this.cachedOutputStream = cos;
>          this.stream = new FileInputStream(file);
>      }
>      
>      @Override
>      public void close() {
> -        try {
> -            if (isSteamOpened()) {
> -                getInputStream().close();
> -            }
> -            // Just remove the itself from cachedOutputStream
> -            if (cachedOutputStream != null) {
> -                cachedOutputStream.releaseFileInputStream(this);
> -            }
> -        } catch (Exception e) {
> -            throw new RuntimeCamelException(e);
> +        if (isSteamOpened()) {
> +            IOHelper.close(getInputStream());
>          }
>      }
>  
>      @Override
>      public void reset() {
>          try {
> -            if (isSteamOpened()) {
> -                getInputStream().close();
> -            }
> +            // reset by closing and creating a new stream based on the file
> +            close();
>              // reset by creating a new stream based on the file
>              stream = new FileInputStream(file);
>          } catch (Exception e) {
> -            throw new RuntimeCamelException(e);
> +            throw new RuntimeCamelException("Cannot reset stream from file " + file, e);
>          }            
>      }
>  
> 
> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original)
> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Wed Jul 14 08:09:58 2010
> @@ -165,6 +165,9 @@ public class DefaultTypeConverter extend
>          // try to find a suitable type converter
>          TypeConverter converter = getOrFindTypeConverter(type, value);
>          if (converter != null) {
> +            if (LOG.isTraceEnabled()) {
> +                LOG.trace("Using converter: " + converter + " to convert " + key);
> +            }
>              Object rc = converter.convertTo(type, exchange, value);
>              if (rc != null) {
>                  return rc;
> 
> Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java (original)
> +++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/FileInputStreamCacheTest.java Wed Jul 14 08:09:58 2010
> @@ -33,9 +33,8 @@ public class FileInputStreamCacheTest ex
>      public void testFileInputStreamCache() throws Exception {
>          Exchange exchange = new DefaultExchange(context);
>  
> -        CachedOutputStream cos = new CachedOutputStream(exchange);
>          File file = new File(TEST_FILE).getAbsoluteFile();
> -        FileInputStreamCache cache = new FileInputStreamCache(file, cos);
> +        FileInputStreamCache cache = new FileInputStreamCache(file);
>  
>          ByteArrayOutputStream bos = new ByteArrayOutputStream();
>          cache.writeTo(bos);
> 
> Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java (original)
> +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java Wed Jul 14 08:09:58 2010
> @@ -89,8 +89,6 @@ public class EventNotifierRedeliveryEven
>          template.sendBody("direct:start", "Hello World");
>          assertMockEndpointsSatisfied();
>  
> -        Thread.sleep(1000);
> -
>          assertEquals(9, events.size());
>  
>          assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
> @@ -112,7 +110,7 @@ public class EventNotifierRedeliveryEven
>          context.addRoutes(new RouteBuilder() {
>              @Override
>              public void configure() throws Exception {
> -                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(25));
> +                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(4).asyncDelayedRedelivery().redeliveryDelay(100));
>  
>                  from("direct:start").throwException(new IllegalArgumentException("Damn"));
>              }
> @@ -123,6 +121,8 @@ public class EventNotifierRedeliveryEven
>          template.sendBody("direct:start", "Hello World");
>          assertMockEndpointsSatisfied();
>  
> +        Thread.sleep(500);
> +
>          assertEquals(9, events.size());
>  
>          assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(0));
> @@ -135,9 +135,8 @@ public class EventNotifierRedeliveryEven
>          e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
>          assertEquals(4, e.getAttempt());
>          assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
> -        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
> -        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
> -        assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
> +
> +        // since its async the ordering of the rest can be different depending per OS and timing
>      }
>  
>  }
> \ No newline at end of file
> 
> Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=963966&r1=963965&r2=963966&view=diff
> ==============================================================================
> --- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
> +++ camel/trunk/camel-core/src/test/resources/log4j.properties Wed Jul 14 08:09:58 2010
> @@ -40,6 +40,8 @@ log4j.logger.org.apache.camel.impl.conve
>  log4j.logger.org.apache.camel.management=WARN
>  log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
>  #log4j.logger.org.apache.camel.impl=TRACE
> +#log4j.logger.org.apache.camel.util.FileUtil=TRACE
> +#log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
>  
>  # CONSOLE appender not used by default
>  log4j.appender.out=org.apache.log4j.ConsoleAppender
> 
> 
>